Skip to main content

taskers_runtime/
signals.rs

1use std::collections::HashMap;
2
3use base64::{Engine as _, engine::general_purpose::STANDARD};
4use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
5
6const OSC_PREFIX: &str = "\u{1b}]";
7const BEL: char = '\u{7}';
8const ST: &str = "\u{1b}\\";
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct ParsedSignal {
12    pub kind: SignalKind,
13    pub message: Option<String>,
14    pub metadata: Option<SignalPaneMetadata>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct ParsedNotification {
19    pub title: Option<String>,
20    pub subtitle: Option<String>,
21    pub body: Option<String>,
22    pub external_id: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ParsedTerminalEvent {
27    Signal(ParsedSignal),
28    Notification(ParsedNotification),
29}
30
31#[derive(Debug, Default, Clone)]
32pub struct SignalStreamParser {
33    pending: String,
34    kitty_notification_drafts: HashMap<String, NotificationDraft>,
35}
36
37impl ParsedSignal {
38    pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
39        SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
40    }
41}
42
43#[derive(Debug, Default, Clone)]
44struct NotificationDraft {
45    title: Option<String>,
46    subtitle: Option<String>,
47    body: Option<String>,
48}
49
50pub fn parse_terminal_events(buffer: &str) -> Vec<ParsedTerminalEvent> {
51    let mut parser = SignalStreamParser::default();
52    parser.push_events(buffer)
53}
54
55pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
56    let mut parser = SignalStreamParser::default();
57    parser.push(buffer)
58}
59
60impl SignalStreamParser {
61    pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
62        self.push_events(chunk)
63            .into_iter()
64            .filter_map(|event| match event {
65                ParsedTerminalEvent::Signal(signal) => Some(signal),
66                ParsedTerminalEvent::Notification(_) => None,
67            })
68            .collect()
69    }
70
71    pub fn push_events(&mut self, chunk: &str) -> Vec<ParsedTerminalEvent> {
72        self.pending.push_str(chunk);
73
74        let mut events = Vec::new();
75        let mut cursor = 0usize;
76        let mut keep_from = floor_char_boundary(
77            &self.pending,
78            self.pending.len().saturating_sub(OSC_PREFIX.len()),
79        );
80
81        while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
82            let frame_start = cursor + found;
83            let content_start = frame_start + OSC_PREFIX.len();
84            let remainder = &self.pending[content_start..];
85
86            let Some((raw_frame, consumed)) = frame_slice(remainder) else {
87                keep_from = frame_start;
88                break;
89            };
90
91            let raw_frame = raw_frame.to_string();
92            if let Some(parsed) = self.parse_frame(&raw_frame) {
93                events.push(parsed);
94            }
95
96            cursor = content_start + consumed;
97            keep_from = cursor;
98        }
99
100        self.pending = self.pending[floor_char_boundary(&self.pending, keep_from)..].to_string();
101        events
102    }
103
104    fn parse_frame(&mut self, frame: &str) -> Option<ParsedTerminalEvent> {
105        if let Some(frame) = frame.strip_prefix("777;taskers;") {
106            return parse_taskers_frame(frame).map(ParsedTerminalEvent::Signal);
107        }
108        if let Some(frame) = frame.strip_prefix("777;notify;") {
109            return parse_rxvt_notification(frame).map(ParsedTerminalEvent::Notification);
110        }
111        if let Some(frame) = frame.strip_prefix("99;") {
112            return self
113                .parse_kitty_notification(frame)
114                .map(ParsedTerminalEvent::Notification);
115        }
116        None
117    }
118}
119
120fn parse_taskers_frame(frame: &str) -> Option<ParsedSignal> {
121    let mut kind = None;
122    let mut message = None;
123    let mut title = None;
124    let mut cwd = None;
125    let mut repo_name = None;
126    let mut git_branch = None;
127    let mut agent_kind = None;
128    let mut agent_active = None;
129    let mut ports = None;
130
131    for part in frame.split(';') {
132        let (key, value) = part.split_once('=')?;
133        match key {
134            "kind" => {
135                kind = Some(match value {
136                    "metadata" => SignalKind::Metadata,
137                    "started" => SignalKind::Started,
138                    "progress" => SignalKind::Progress,
139                    "completed" => SignalKind::Completed,
140                    "waiting_input" => SignalKind::WaitingInput,
141                    "error" => SignalKind::Error,
142                    "notification" => SignalKind::Notification,
143                    _ => return None,
144                });
145            }
146            "message" => message = percent_decode(value),
147            "message_b64" => message = decode_base64(value),
148            "title" => title = percent_decode(value),
149            "title_b64" => title = decode_base64(value),
150            "cwd" => cwd = percent_decode(value),
151            "cwd_b64" => cwd = decode_base64(value),
152            "repo" | "repo_name" => repo_name = percent_decode(value),
153            "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
154            "branch" | "git_branch" => git_branch = percent_decode(value),
155            "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
156            "agent" | "agent_kind" => agent_kind = percent_decode(value),
157            "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
158            "agent_active" => agent_active = parse_bool(value),
159            "agent_active_b64" => {
160                agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
161            }
162            "ports" => ports = parse_ports(value),
163            "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
164            _ => {}
165        }
166    }
167
168    let metadata = if title.is_some()
169        || cwd.is_some()
170        || repo_name.is_some()
171        || git_branch.is_some()
172        || agent_kind.is_some()
173        || agent_active.is_some()
174        || ports.is_some()
175    {
176        Some(SignalPaneMetadata {
177            title,
178            agent_title: None,
179            cwd,
180            repo_name,
181            git_branch,
182            ports: ports.unwrap_or_default(),
183            agent_kind,
184            agent_active,
185        })
186    } else {
187        None
188    };
189
190    Some(ParsedSignal {
191        kind: kind?,
192        message,
193        metadata,
194    })
195}
196
197fn parse_rxvt_notification(frame: &str) -> Option<ParsedNotification> {
198    let (title, body) = match frame.split_once(';') {
199        Some((title, body)) => (title, Some(body)),
200        None => (frame, None),
201    };
202
203    let title = Some(title.to_string()).filter(|value| !value.is_empty());
204    let body = body.map(str::to_string).filter(|value| !value.is_empty());
205
206    if title.is_none() && body.is_none() {
207        return None;
208    }
209
210    Some(ParsedNotification {
211        title,
212        subtitle: None,
213        body,
214        external_id: None,
215    })
216}
217
218impl SignalStreamParser {
219    fn parse_kitty_notification(&mut self, frame: &str) -> Option<ParsedNotification> {
220        let (param_tokens, payload) = split_kitty_params_and_payload(frame);
221        let mut external_id = None;
222        let mut part = None;
223        let mut done = None;
224
225        for token in param_tokens {
226            let (key, value) = token.split_once('=')?;
227            match key {
228                "i" => {
229                    external_id = Some(value.to_string()).filter(|value| !value.is_empty());
230                }
231                "p" => {
232                    part = Some(value.to_ascii_lowercase());
233                }
234                "d" => {
235                    done = match value {
236                        "0" => Some(false),
237                        "1" => Some(true),
238                        _ => None,
239                    };
240                }
241                "e" => {}
242                _ => {}
243            }
244        }
245
246        let mut draft = external_id
247            .as_ref()
248            .and_then(|id| self.kitty_notification_drafts.remove(id))
249            .unwrap_or_default();
250
251        let payload = Some(payload.to_string()).filter(|value| !value.is_empty());
252        match part.as_deref() {
253            Some("title") => draft.title = payload,
254            Some("subtitle") => draft.subtitle = payload,
255            Some("body") => draft.body = payload,
256            Some(_) => {}
257            None => {
258                if let Some(payload) = payload {
259                    if draft.title.is_none() {
260                        draft.title = Some(payload);
261                    } else {
262                        draft.body = Some(payload);
263                    }
264                }
265            }
266        }
267
268        let should_defer = matches!(done, Some(false)) && part.is_some();
269        if should_defer {
270            if let Some(external_id) = external_id {
271                self.kitty_notification_drafts.insert(external_id, draft);
272            }
273            return None;
274        }
275
276        if draft.title.is_none() && draft.subtitle.is_none() && draft.body.is_none() {
277            return None;
278        }
279
280        Some(ParsedNotification {
281            title: draft.title,
282            subtitle: draft.subtitle,
283            body: draft.body,
284            external_id,
285        })
286    }
287}
288
289fn split_kitty_params_and_payload(frame: &str) -> (Vec<&str>, &str) {
290    let mut params = Vec::new();
291    let mut start = 0usize;
292
293    if let Some(stripped) = frame.strip_prefix([';', ':']) {
294        return (params, stripped);
295    }
296
297    while start < frame.len() {
298        let remainder = &frame[start..];
299        let Some(separator) = remainder.find([';', ':']) else {
300            if is_kitty_param_token(remainder) {
301                params.push(remainder);
302                return (params, "");
303            }
304            return (params, remainder);
305        };
306
307        let token_end = start + separator;
308        let token = &frame[start..token_end];
309        if !is_kitty_param_token(token) {
310            return (params, &frame[start..]);
311        }
312
313        params.push(token);
314        start = token_end + 1;
315    }
316
317    (params, "")
318}
319
320fn is_kitty_param_token(token: &str) -> bool {
321    token
322        .split_once('=')
323        .is_some_and(|(key, _)| !key.is_empty())
324}
325
326fn parse_ports(value: &str) -> Option<Vec<u16>> {
327    if value.is_empty() {
328        return Some(Vec::new());
329    }
330
331    value
332        .split(',')
333        .map(|part| part.parse::<u16>().ok())
334        .collect::<Option<Vec<_>>>()
335}
336
337fn parse_bool(value: &str) -> Option<bool> {
338    match value.trim().to_ascii_lowercase().as_str() {
339        "1" | "true" | "yes" | "on" => Some(true),
340        "0" | "false" | "no" | "off" => Some(false),
341        _ => None,
342    }
343}
344
345fn decode_base64(value: &str) -> Option<String> {
346    let decoded = STANDARD.decode(value).ok()?;
347    String::from_utf8(decoded).ok()
348}
349
350fn percent_decode(value: &str) -> Option<String> {
351    let mut bytes = Vec::with_capacity(value.len());
352    let raw = value.as_bytes();
353    let mut index = 0usize;
354
355    while index < raw.len() {
356        match raw[index] {
357            b'%' if index + 2 < raw.len() => {
358                let high = decode_hex(raw[index + 1])?;
359                let low = decode_hex(raw[index + 2])?;
360                bytes.push((high << 4) | low);
361                index += 3;
362            }
363            byte => {
364                bytes.push(byte);
365                index += 1;
366            }
367        }
368    }
369
370    String::from_utf8(bytes).ok()
371}
372
373fn decode_hex(byte: u8) -> Option<u8> {
374    match byte {
375        b'0'..=b'9' => Some(byte - b'0'),
376        b'a'..=b'f' => Some(byte - b'a' + 10),
377        b'A'..=b'F' => Some(byte - b'A' + 10),
378        _ => None,
379    }
380}
381
382fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
383    if let Some(end) = remainder.find(BEL) {
384        return Some((&remainder[..end], end + BEL.len_utf8()));
385    }
386    if let Some(end) = remainder.find(ST) {
387        return Some((&remainder[..end], end + ST.len()));
388    }
389    None
390}
391
392fn floor_char_boundary(value: &str, mut index: usize) -> usize {
393    index = index.min(value.len());
394    while index > 0 && !value.is_char_boundary(index) {
395        index -= 1;
396    }
397    index
398}
399
400#[cfg(test)]
401mod tests {
402    use base64::{Engine as _, engine::general_purpose::STANDARD};
403    use taskers_domain::SignalKind;
404
405    use super::{
406        ParsedTerminalEvent, SignalStreamParser, parse_signal_frames, parse_terminal_events,
407    };
408
409    #[test]
410    fn parses_multiple_frames_with_different_terminators() {
411        let output = concat!(
412            "hello",
413            "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
414            "world",
415            "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
416        );
417
418        let frames = parse_signal_frames(output);
419
420        assert_eq!(frames.len(), 2);
421        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
422        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
423        assert_eq!(frames[1].kind, SignalKind::Completed);
424    }
425
426    #[test]
427    fn ignores_unknown_frames() {
428        let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
429        assert!(parse_signal_frames(output).is_empty());
430    }
431
432    #[test]
433    fn signal_parser_ignores_notification_only_frames() {
434        let output = "\u{1b}]777;notify;Taskers;Body\u{7}";
435        assert!(parse_signal_frames(output).is_empty());
436    }
437
438    #[test]
439    fn stream_parser_handles_split_frames() {
440        let mut parser = SignalStreamParser::default();
441
442        assert!(
443            parser
444                .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
445                .is_empty()
446        );
447
448        let frames = parser.push("%20approval\u{7}");
449        assert_eq!(frames.len(), 1);
450        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
451        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
452    }
453
454    #[test]
455    fn stream_parser_keeps_partial_prefix_on_utf8_boundary() {
456        let mut parser = SignalStreamParser::default();
457        let noisy_prefix = "abbr'...\n⠙ ";
458        let partial = format!("{noisy_prefix}\u{1b}]777;taskers;kind=progress;message=Working");
459
460        assert!(parser.push(&partial).is_empty());
461
462        let frames = parser.push("\u{7}");
463        assert_eq!(frames.len(), 1);
464        assert_eq!(frames[0].kind, SignalKind::Progress);
465        assert_eq!(frames[0].message.as_deref(), Some("Working"));
466    }
467
468    #[test]
469    fn parses_metadata_snapshots_with_base64_fields() {
470        let output = format!(
471            "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
472            STANDARD.encode("/home/notes/Projects/taskers"),
473            STANDARD.encode("taskers"),
474            STANDARD.encode("main"),
475            STANDARD.encode("codex"),
476            STANDARD.encode("codex · taskers"),
477        );
478
479        let frames = parse_signal_frames(&output);
480
481        assert_eq!(frames.len(), 1);
482        assert_eq!(frames[0].kind, SignalKind::Metadata);
483        let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
484        assert_eq!(
485            metadata.cwd.as_deref(),
486            Some("/home/notes/Projects/taskers")
487        );
488        assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
489        assert_eq!(metadata.git_branch.as_deref(), Some("main"));
490        assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
491        assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
492        assert_eq!(metadata.ports, vec![3000, 8080]);
493    }
494
495    #[test]
496    fn parses_rxvt_notification_frames() {
497        let frames = parse_terminal_events("\u{1b}]777;notify;OSC777 Title;OSC777 Body\u{7}");
498
499        assert_eq!(
500            frames,
501            vec![ParsedTerminalEvent::Notification(
502                super::ParsedNotification {
503                    title: Some("OSC777 Title".into()),
504                    subtitle: None,
505                    body: Some("OSC777 Body".into()),
506                    external_id: None,
507                }
508            )]
509        );
510    }
511
512    #[test]
513    fn parses_simple_kitty_notification_frames() {
514        let frames = parse_terminal_events("\u{1b}]99;;Kitty Simple\u{1b}\\");
515
516        assert_eq!(
517            frames,
518            vec![ParsedTerminalEvent::Notification(
519                super::ParsedNotification {
520                    title: Some("Kitty Simple".into()),
521                    subtitle: None,
522                    body: None,
523                    external_id: None,
524                }
525            )]
526        );
527    }
528
529    #[test]
530    fn parses_chunked_kitty_notification_frames() {
531        let mut parser = SignalStreamParser::default();
532
533        assert!(
534            parser
535                .push_events("\u{1b}]99;i=kitty:d=0:p=title;Kitty Title\u{1b}\\")
536                .is_empty()
537        );
538
539        let frames = parser.push_events("\u{1b}]99;i=kitty:p=body;Kitty Body\u{1b}\\");
540        assert_eq!(
541            frames,
542            vec![ParsedTerminalEvent::Notification(
543                super::ParsedNotification {
544                    title: Some("Kitty Title".into()),
545                    subtitle: None,
546                    body: Some("Kitty Body".into()),
547                    external_id: Some("kitty".into()),
548                }
549            )]
550        );
551    }
552
553    #[test]
554    fn parses_doc_style_kitty_notification_payloads() {
555        let frames = parse_terminal_events("\u{1b}]99;i=1;e=1;d=0:Hello World\u{1b}\\");
556
557        assert_eq!(
558            frames,
559            vec![ParsedTerminalEvent::Notification(
560                super::ParsedNotification {
561                    title: Some("Hello World".into()),
562                    subtitle: None,
563                    body: None,
564                    external_id: Some("1".into()),
565                }
566            )]
567        );
568    }
569}