Skip to main content

iterm2_client/
notification.rs

1//! Notification streams with typed filtering.
2//!
3//! Use `NotificationStream` for raw notifications, or typed helpers like
4//! `keystroke_notifications` and `new_session_notifications` to filter for
5//! specific event types.
6
7use crate::proto;
8use futures_util::Stream;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::sync::broadcast;
12
13/// A stream of all iTerm2 notifications (unfiltered).
14///
15/// Wraps a `broadcast::Receiver` and implements `futures_util::Stream`.
16pub struct NotificationStream {
17    rx: broadcast::Receiver<proto::Notification>,
18}
19
20impl NotificationStream {
21    /// Create a new notification stream from a broadcast receiver.
22    pub fn new(rx: broadcast::Receiver<proto::Notification>) -> Self {
23        Self { rx }
24    }
25}
26
27impl Stream for NotificationStream {
28    type Item = proto::Notification;
29
30    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        loop {
32            match self.rx.try_recv() {
33                Ok(notif) => return Poll::Ready(Some(notif)),
34                Err(broadcast::error::TryRecvError::Empty) => {
35                    cx.waker().wake_by_ref();
36                    return Poll::Pending;
37                }
38                Err(broadcast::error::TryRecvError::Lagged(_)) => {
39                    // Messages were dropped. Return Pending and retry on
40                    // next poll to avoid busy-spinning.
41                    cx.waker().wake_by_ref();
42                    return Poll::Pending;
43                }
44                Err(broadcast::error::TryRecvError::Closed) => return Poll::Ready(None),
45            }
46        }
47    }
48}
49
50/// Stream of keystroke notifications, filtering out all other types.
51pub fn keystroke_notifications(
52    rx: broadcast::Receiver<proto::Notification>,
53) -> impl Stream<Item = proto::KeystrokeNotification> {
54    futures_util::stream::unfold(rx, |mut rx| async move {
55        loop {
56            match rx.recv().await {
57                Ok(notif) => {
58                    if let Some(k) = notif.keystroke_notification {
59                        return Some((k, rx));
60                    }
61                }
62                Err(broadcast::error::RecvError::Lagged(_)) => continue,
63                Err(broadcast::error::RecvError::Closed) => return None,
64            }
65        }
66    })
67}
68
69/// Stream of screen update notifications.
70pub fn screen_update_notifications(
71    rx: broadcast::Receiver<proto::Notification>,
72) -> impl Stream<Item = proto::ScreenUpdateNotification> {
73    futures_util::stream::unfold(rx, |mut rx| async move {
74        loop {
75            match rx.recv().await {
76                Ok(notif) => {
77                    if let Some(n) = notif.screen_update_notification {
78                        return Some((n, rx));
79                    }
80                }
81                Err(broadcast::error::RecvError::Lagged(_)) => continue,
82                Err(broadcast::error::RecvError::Closed) => return None,
83            }
84        }
85    })
86}
87
88/// Stream of prompt notifications (prompt shown, command started, command ended).
89pub fn prompt_notifications(
90    rx: broadcast::Receiver<proto::Notification>,
91) -> impl Stream<Item = proto::PromptNotification> {
92    futures_util::stream::unfold(rx, |mut rx| async move {
93        loop {
94            match rx.recv().await {
95                Ok(notif) => {
96                    if let Some(n) = notif.prompt_notification {
97                        return Some((n, rx));
98                    }
99                }
100                Err(broadcast::error::RecvError::Lagged(_)) => continue,
101                Err(broadcast::error::RecvError::Closed) => return None,
102            }
103        }
104    })
105}
106
107/// Stream of new session creation notifications.
108pub fn new_session_notifications(
109    rx: broadcast::Receiver<proto::Notification>,
110) -> impl Stream<Item = proto::NewSessionNotification> {
111    futures_util::stream::unfold(rx, |mut rx| async move {
112        loop {
113            match rx.recv().await {
114                Ok(notif) => {
115                    if let Some(n) = notif.new_session_notification {
116                        return Some((n, rx));
117                    }
118                }
119                Err(broadcast::error::RecvError::Lagged(_)) => continue,
120                Err(broadcast::error::RecvError::Closed) => return None,
121            }
122        }
123    })
124}
125
126/// Stream of session termination notifications.
127pub fn terminate_session_notifications(
128    rx: broadcast::Receiver<proto::Notification>,
129) -> impl Stream<Item = proto::TerminateSessionNotification> {
130    futures_util::stream::unfold(rx, |mut rx| async move {
131        loop {
132            match rx.recv().await {
133                Ok(notif) => {
134                    if let Some(n) = notif.terminate_session_notification {
135                        return Some((n, rx));
136                    }
137                }
138                Err(broadcast::error::RecvError::Lagged(_)) => continue,
139                Err(broadcast::error::RecvError::Closed) => return None,
140            }
141        }
142    })
143}
144
145/// Stream of focus change notifications (window, tab, session, or app focus).
146pub fn focus_changed_notifications(
147    rx: broadcast::Receiver<proto::Notification>,
148) -> impl Stream<Item = proto::FocusChangedNotification> {
149    futures_util::stream::unfold(rx, |mut rx| async move {
150        loop {
151            match rx.recv().await {
152                Ok(notif) => {
153                    if let Some(n) = notif.focus_changed_notification {
154                        return Some((n, rx));
155                    }
156                }
157                Err(broadcast::error::RecvError::Lagged(_)) => continue,
158                Err(broadcast::error::RecvError::Closed) => return None,
159            }
160        }
161    })
162}
163
164/// Stream of layout change notifications (windows/tabs/sessions restructured).
165pub fn layout_changed_notifications(
166    rx: broadcast::Receiver<proto::Notification>,
167) -> impl Stream<Item = proto::LayoutChangedNotification> {
168    futures_util::stream::unfold(rx, |mut rx| async move {
169        loop {
170            match rx.recv().await {
171                Ok(notif) => {
172                    if let Some(n) = notif.layout_changed_notification {
173                        return Some((n, rx));
174                    }
175                }
176                Err(broadcast::error::RecvError::Lagged(_)) => continue,
177                Err(broadcast::error::RecvError::Closed) => return None,
178            }
179        }
180    })
181}
182
183/// Stream of variable change notifications.
184pub fn variable_changed_notifications(
185    rx: broadcast::Receiver<proto::Notification>,
186) -> impl Stream<Item = proto::VariableChangedNotification> {
187    futures_util::stream::unfold(rx, |mut rx| async move {
188        loop {
189            match rx.recv().await {
190                Ok(notif) => {
191                    if let Some(n) = notif.variable_changed_notification {
192                        return Some((n, rx));
193                    }
194                }
195                Err(broadcast::error::RecvError::Lagged(_)) => continue,
196                Err(broadcast::error::RecvError::Closed) => return None,
197            }
198        }
199    })
200}
201
202/// Stream of custom escape sequence notifications (OSC 1337).
203pub fn custom_escape_sequence_notifications(
204    rx: broadcast::Receiver<proto::Notification>,
205) -> impl Stream<Item = proto::CustomEscapeSequenceNotification> {
206    futures_util::stream::unfold(rx, |mut rx| async move {
207        loop {
208            match rx.recv().await {
209                Ok(notif) => {
210                    if let Some(n) = notif.custom_escape_sequence_notification {
211                        return Some((n, rx));
212                    }
213                }
214                Err(broadcast::error::RecvError::Lagged(_)) => continue,
215                Err(broadcast::error::RecvError::Closed) => return None,
216            }
217        }
218    })
219}