iterm2_client/
notification.rs1use crate::proto;
8use futures_util::Stream;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::sync::broadcast;
12
13pub struct NotificationStream {
17 rx: broadcast::Receiver<proto::Notification>,
18}
19
20impl NotificationStream {
21 pub fn new(rx: broadcast::Receiver<proto::Notification>) -> Self {
23 Self { rx }
24 }
25}
26
27impl Stream for NotificationStream {
28 type Item = proto::Notification;
29
30 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 loop {
32 match self.rx.try_recv() {
33 Ok(notif) => return Poll::Ready(Some(notif)),
34 Err(broadcast::error::TryRecvError::Empty) => {
35 cx.waker().wake_by_ref();
36 return Poll::Pending;
37 }
38 Err(broadcast::error::TryRecvError::Lagged(_)) => {
39 cx.waker().wake_by_ref();
42 return Poll::Pending;
43 }
44 Err(broadcast::error::TryRecvError::Closed) => return Poll::Ready(None),
45 }
46 }
47 }
48}
49
50pub fn keystroke_notifications(
52 rx: broadcast::Receiver<proto::Notification>,
53) -> impl Stream<Item = proto::KeystrokeNotification> {
54 futures_util::stream::unfold(rx, |mut rx| async move {
55 loop {
56 match rx.recv().await {
57 Ok(notif) => {
58 if let Some(k) = notif.keystroke_notification {
59 return Some((k, rx));
60 }
61 }
62 Err(broadcast::error::RecvError::Lagged(_)) => continue,
63 Err(broadcast::error::RecvError::Closed) => return None,
64 }
65 }
66 })
67}
68
69pub fn screen_update_notifications(
71 rx: broadcast::Receiver<proto::Notification>,
72) -> impl Stream<Item = proto::ScreenUpdateNotification> {
73 futures_util::stream::unfold(rx, |mut rx| async move {
74 loop {
75 match rx.recv().await {
76 Ok(notif) => {
77 if let Some(n) = notif.screen_update_notification {
78 return Some((n, rx));
79 }
80 }
81 Err(broadcast::error::RecvError::Lagged(_)) => continue,
82 Err(broadcast::error::RecvError::Closed) => return None,
83 }
84 }
85 })
86}
87
88pub fn prompt_notifications(
90 rx: broadcast::Receiver<proto::Notification>,
91) -> impl Stream<Item = proto::PromptNotification> {
92 futures_util::stream::unfold(rx, |mut rx| async move {
93 loop {
94 match rx.recv().await {
95 Ok(notif) => {
96 if let Some(n) = notif.prompt_notification {
97 return Some((n, rx));
98 }
99 }
100 Err(broadcast::error::RecvError::Lagged(_)) => continue,
101 Err(broadcast::error::RecvError::Closed) => return None,
102 }
103 }
104 })
105}
106
107pub fn new_session_notifications(
109 rx: broadcast::Receiver<proto::Notification>,
110) -> impl Stream<Item = proto::NewSessionNotification> {
111 futures_util::stream::unfold(rx, |mut rx| async move {
112 loop {
113 match rx.recv().await {
114 Ok(notif) => {
115 if let Some(n) = notif.new_session_notification {
116 return Some((n, rx));
117 }
118 }
119 Err(broadcast::error::RecvError::Lagged(_)) => continue,
120 Err(broadcast::error::RecvError::Closed) => return None,
121 }
122 }
123 })
124}
125
126pub fn terminate_session_notifications(
128 rx: broadcast::Receiver<proto::Notification>,
129) -> impl Stream<Item = proto::TerminateSessionNotification> {
130 futures_util::stream::unfold(rx, |mut rx| async move {
131 loop {
132 match rx.recv().await {
133 Ok(notif) => {
134 if let Some(n) = notif.terminate_session_notification {
135 return Some((n, rx));
136 }
137 }
138 Err(broadcast::error::RecvError::Lagged(_)) => continue,
139 Err(broadcast::error::RecvError::Closed) => return None,
140 }
141 }
142 })
143}
144
145pub fn focus_changed_notifications(
147 rx: broadcast::Receiver<proto::Notification>,
148) -> impl Stream<Item = proto::FocusChangedNotification> {
149 futures_util::stream::unfold(rx, |mut rx| async move {
150 loop {
151 match rx.recv().await {
152 Ok(notif) => {
153 if let Some(n) = notif.focus_changed_notification {
154 return Some((n, rx));
155 }
156 }
157 Err(broadcast::error::RecvError::Lagged(_)) => continue,
158 Err(broadcast::error::RecvError::Closed) => return None,
159 }
160 }
161 })
162}
163
164pub fn layout_changed_notifications(
166 rx: broadcast::Receiver<proto::Notification>,
167) -> impl Stream<Item = proto::LayoutChangedNotification> {
168 futures_util::stream::unfold(rx, |mut rx| async move {
169 loop {
170 match rx.recv().await {
171 Ok(notif) => {
172 if let Some(n) = notif.layout_changed_notification {
173 return Some((n, rx));
174 }
175 }
176 Err(broadcast::error::RecvError::Lagged(_)) => continue,
177 Err(broadcast::error::RecvError::Closed) => return None,
178 }
179 }
180 })
181}
182
183pub fn variable_changed_notifications(
185 rx: broadcast::Receiver<proto::Notification>,
186) -> impl Stream<Item = proto::VariableChangedNotification> {
187 futures_util::stream::unfold(rx, |mut rx| async move {
188 loop {
189 match rx.recv().await {
190 Ok(notif) => {
191 if let Some(n) = notif.variable_changed_notification {
192 return Some((n, rx));
193 }
194 }
195 Err(broadcast::error::RecvError::Lagged(_)) => continue,
196 Err(broadcast::error::RecvError::Closed) => return None,
197 }
198 }
199 })
200}
201
202pub fn custom_escape_sequence_notifications(
204 rx: broadcast::Receiver<proto::Notification>,
205) -> impl Stream<Item = proto::CustomEscapeSequenceNotification> {
206 futures_util::stream::unfold(rx, |mut rx| async move {
207 loop {
208 match rx.recv().await {
209 Ok(notif) => {
210 if let Some(n) = notif.custom_escape_sequence_notification {
211 return Some((n, rx));
212 }
213 }
214 Err(broadcast::error::RecvError::Lagged(_)) => continue,
215 Err(broadcast::error::RecvError::Closed) => return None,
216 }
217 }
218 })
219}