1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.
//! [`NotificationStream`]: the consumer's view of a live watch session.
//!
//! Backed by a bounded [`tokio::sync::mpsc`] channel that the supervisor
//! task fills. Dropping the stream signals cooperative cancellation to the
//! supervisor via a [`tokio::sync::oneshot`] channel held in the stream;
//! the supervisor `select!`s on the matching receiver and exits cleanly.
//! No `JoinHandle::abort` is involved.
use Pin;
use ;
use Stream;
use ;
use crate::;
/// Asynchronous stream of [`Notification`] values from a single watch
/// session.
///
/// `NotificationStream` is single-consumer; it deliberately does not
/// implement [`Clone`]. Callers that want to fan a single watch out to
/// multiple consumers should tee through [`tokio::sync::broadcast`] (or
/// similar) themselves; the single-consumer contract keeps checkpoint
/// advancement and trigger semantics unambiguous.
///
/// The stream items are `Result<Notification, ClientError>` so transport,
/// decode, and stream-protocol failures surface inline rather than being
/// silently swallowed. After the first terminal error, the stream yields
/// `None` on subsequent polls; the supervisor task has already exited.
///
/// # Cancellation
///
/// Dropping the stream cancels the supervisor task cooperatively: the
/// internal cancellation oneshot's `Sender` is dropped, which closes the
/// matching `Receiver` the supervisor is `select!`-ing on. The supervisor
/// finishes its current syscall, observes the cancel, and exits within one
/// event-loop tick. No buffered notifications are lost from the consumer's
/// perspective because the consumer has already moved on.
///
/// # Backpressure
///
/// The internal channel is bounded at a fixed capacity of 128. A slow
/// consumer that lets the channel fill applies TCP backpressure all the
/// way upstream: the supervisor's `send` `await`s, which makes it stop
/// reading bytes from the wire, which makes the kernel stop `ACKing`,
/// which throttles the server. No notifications are dropped and no
/// internal buffer grows without bound.