Skip to main content

taskers_runtime/
signals.rs

1use std::collections::HashMap;
2
3use base64::{Engine as _, engine::general_purpose::STANDARD};
4use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
5
6const OSC_PREFIX: &str = "\u{1b}]";
7const BEL: char = '\u{7}';
8const ST: &str = "\u{1b}\\";
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct ParsedSignal {
12    pub kind: SignalKind,
13    pub message: Option<String>,
14    pub metadata: Option<SignalPaneMetadata>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct ParsedNotification {
19    pub title: Option<String>,
20    pub subtitle: Option<String>,
21    pub body: Option<String>,
22    pub external_id: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ParsedTerminalEvent {
27    Signal(ParsedSignal),
28    Notification(ParsedNotification),
29}
30
31#[derive(Debug, Default, Clone)]
32pub struct SignalStreamParser {
33    pending: String,
34    kitty_notification_drafts: HashMap<String, NotificationDraft>,
35}
36
37impl ParsedSignal {
38    pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
39        SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
40    }
41}
42
43#[derive(Debug, Default, Clone)]
44struct NotificationDraft {
45    title: NotificationFieldDraft,
46    subtitle: NotificationFieldDraft,
47    body: NotificationFieldDraft,
48}
49
50#[derive(Debug, Default, Clone)]
51struct NotificationFieldDraft {
52    fragments: Vec<NotificationFragment>,
53}
54
55#[derive(Debug, Clone)]
56struct NotificationFragment {
57    payload: String,
58    encoded: bool,
59}
60
61pub fn parse_terminal_events(buffer: &str) -> Vec<ParsedTerminalEvent> {
62    let mut parser = SignalStreamParser::default();
63    parser.push_events(buffer)
64}
65
66pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
67    let mut parser = SignalStreamParser::default();
68    parser.push(buffer)
69}
70
71impl SignalStreamParser {
72    pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
73        self.push_events(chunk)
74            .into_iter()
75            .filter_map(|event| match event {
76                ParsedTerminalEvent::Signal(signal) => Some(signal),
77                ParsedTerminalEvent::Notification(_) => None,
78            })
79            .collect()
80    }
81
82    pub fn push_events(&mut self, chunk: &str) -> Vec<ParsedTerminalEvent> {
83        self.pending.push_str(chunk);
84
85        let mut events = Vec::new();
86        let mut cursor = 0usize;
87        let mut keep_from = floor_char_boundary(
88            &self.pending,
89            self.pending.len().saturating_sub(OSC_PREFIX.len()),
90        );
91
92        while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
93            let frame_start = cursor + found;
94            let content_start = frame_start + OSC_PREFIX.len();
95            let remainder = &self.pending[content_start..];
96
97            let Some((raw_frame, consumed)) = frame_slice(remainder) else {
98                keep_from = frame_start;
99                break;
100            };
101
102            let raw_frame = raw_frame.to_string();
103            if let Some(parsed) = self.parse_frame(&raw_frame) {
104                events.push(parsed);
105            }
106
107            cursor = content_start + consumed;
108            keep_from = cursor;
109        }
110
111        self.pending = self.pending[floor_char_boundary(&self.pending, keep_from)..].to_string();
112        events
113    }
114
115    fn parse_frame(&mut self, frame: &str) -> Option<ParsedTerminalEvent> {
116        if let Some(frame) = frame.strip_prefix("777;taskers;") {
117            return parse_taskers_frame(frame).map(ParsedTerminalEvent::Signal);
118        }
119        if let Some(frame) = frame.strip_prefix("777;notify;") {
120            return parse_rxvt_notification(frame).map(ParsedTerminalEvent::Notification);
121        }
122        if let Some(frame) = frame.strip_prefix("99;") {
123            return self
124                .parse_kitty_notification(frame)
125                .map(ParsedTerminalEvent::Notification);
126        }
127        None
128    }
129}
130
131fn parse_taskers_frame(frame: &str) -> Option<ParsedSignal> {
132    let mut kind = None;
133    let mut message = None;
134    let mut title = None;
135    let mut cwd = None;
136    let mut repo_name = None;
137    let mut git_branch = None;
138    let mut agent_kind = None;
139    let mut agent_active = None;
140    let mut ports = None;
141
142    for part in frame.split(';') {
143        let (key, value) = part.split_once('=')?;
144        match key {
145            "kind" => {
146                kind = Some(match value {
147                    "metadata" => SignalKind::Metadata,
148                    "started" => SignalKind::Started,
149                    "progress" => SignalKind::Progress,
150                    "completed" => SignalKind::Completed,
151                    "waiting_input" => SignalKind::WaitingInput,
152                    "error" => SignalKind::Error,
153                    "notification" => SignalKind::Notification,
154                    _ => return None,
155                });
156            }
157            "message" => message = percent_decode(value),
158            "message_b64" => message = decode_base64(value),
159            "title" => title = percent_decode(value),
160            "title_b64" => title = decode_base64(value),
161            "cwd" => cwd = percent_decode(value),
162            "cwd_b64" => cwd = decode_base64(value),
163            "repo" | "repo_name" => repo_name = percent_decode(value),
164            "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
165            "branch" | "git_branch" => git_branch = percent_decode(value),
166            "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
167            "agent" | "agent_kind" => agent_kind = percent_decode(value),
168            "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
169            "agent_active" => agent_active = parse_bool(value),
170            "agent_active_b64" => {
171                agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
172            }
173            "ports" => ports = parse_ports(value),
174            "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
175            _ => {}
176        }
177    }
178
179    let metadata = if title.is_some()
180        || cwd.is_some()
181        || repo_name.is_some()
182        || git_branch.is_some()
183        || agent_kind.is_some()
184        || agent_active.is_some()
185        || ports.is_some()
186    {
187        Some(SignalPaneMetadata {
188            title,
189            agent_title: None,
190            cwd,
191            repo_name,
192            git_branch,
193            ports: ports.unwrap_or_default(),
194            agent_kind,
195            agent_active,
196            agent_command: None,
197        })
198    } else {
199        None
200    };
201
202    Some(ParsedSignal {
203        kind: kind?,
204        message,
205        metadata,
206    })
207}
208
209fn parse_rxvt_notification(frame: &str) -> Option<ParsedNotification> {
210    let (title, body) = match frame.split_once(';') {
211        Some((title, body)) => (title, Some(body)),
212        None => (frame, None),
213    };
214
215    let title = Some(title.to_string()).filter(|value| !value.is_empty());
216    let body = body.map(str::to_string).filter(|value| !value.is_empty());
217
218    if title.is_none() && body.is_none() {
219        return None;
220    }
221
222    Some(ParsedNotification {
223        title,
224        subtitle: None,
225        body,
226        external_id: None,
227    })
228}
229
230impl SignalStreamParser {
231    fn parse_kitty_notification(&mut self, frame: &str) -> Option<ParsedNotification> {
232        let (param_tokens, payload) = split_kitty_params_and_payload(frame);
233        let mut external_id = None;
234        let mut part = None;
235        let mut done = None;
236        let mut encoded = false;
237
238        for token in param_tokens {
239            let (key, value) = token.split_once('=')?;
240            match key {
241                "i" => {
242                    external_id = Some(value.to_string()).filter(|value| !value.is_empty());
243                }
244                "p" => {
245                    part = Some(value.to_ascii_lowercase());
246                }
247                "d" => {
248                    done = match value {
249                        "0" => Some(false),
250                        "1" => Some(true),
251                        _ => None,
252                    };
253                }
254                "e" => {
255                    encoded = value == "1";
256                }
257                _ => {}
258            }
259        }
260
261        let mut draft = external_id
262            .as_ref()
263            .and_then(|id| self.kitty_notification_drafts.remove(id))
264            .unwrap_or_default();
265
266        let payload = Some(payload.to_string()).filter(|value| !value.is_empty());
267        match part.as_deref() {
268            Some("title") | None => {
269                if let Some(payload) = payload {
270                    draft.title.push(payload, encoded);
271                }
272            }
273            Some("subtitle") => {
274                if let Some(payload) = payload {
275                    draft.subtitle.push(payload, encoded);
276                }
277            }
278            Some("body") => {
279                if let Some(payload) = payload {
280                    draft.body.push(payload, encoded);
281                }
282            }
283            Some(_) => {}
284        }
285
286        let should_defer = matches!(done, Some(false));
287        if should_defer {
288            if let Some(external_id) = external_id {
289                self.kitty_notification_drafts.insert(external_id, draft);
290            }
291            return None;
292        }
293
294        let title = draft.title.into_value();
295        let subtitle = draft.subtitle.into_value();
296        let body = draft.body.into_value();
297
298        if title.is_none() && subtitle.is_none() && body.is_none() {
299            return None;
300        }
301
302        Some(ParsedNotification {
303            title,
304            subtitle,
305            body,
306            external_id,
307        })
308    }
309}
310
311fn split_kitty_params_and_payload(frame: &str) -> (Vec<&str>, &str) {
312    let mut params = Vec::new();
313    let mut start = 0usize;
314
315    if let Some(stripped) = frame.strip_prefix([';', ':']) {
316        return (params, stripped);
317    }
318
319    while start < frame.len() {
320        let remainder = &frame[start..];
321        let Some(separator) = remainder.find([';', ':']) else {
322            if is_kitty_param_token(remainder) {
323                params.push(remainder);
324                return (params, "");
325            }
326            return (params, remainder);
327        };
328
329        let token_end = start + separator;
330        let token = &frame[start..token_end];
331        if !is_kitty_param_token(token) {
332            return (params, &frame[start..]);
333        }
334
335        params.push(token);
336        start = token_end + 1;
337    }
338
339    (params, "")
340}
341
342fn is_kitty_param_token(token: &str) -> bool {
343    token
344        .split_once('=')
345        .is_some_and(|(key, _)| !key.is_empty())
346}
347
348fn parse_ports(value: &str) -> Option<Vec<u16>> {
349    if value.is_empty() {
350        return Some(Vec::new());
351    }
352
353    value
354        .split(',')
355        .map(|part| part.parse::<u16>().ok())
356        .collect::<Option<Vec<_>>>()
357}
358
359fn parse_bool(value: &str) -> Option<bool> {
360    match value.trim().to_ascii_lowercase().as_str() {
361        "1" | "true" | "yes" | "on" => Some(true),
362        "0" | "false" | "no" | "off" => Some(false),
363        _ => None,
364    }
365}
366
367fn decode_base64(value: &str) -> Option<String> {
368    let mut normalized = value.to_string();
369    let missing_padding = normalized.len() % 4;
370    if missing_padding != 0 {
371        normalized.extend(std::iter::repeat_n('=', 4 - missing_padding));
372    }
373    let decoded = STANDARD.decode(normalized).ok()?;
374    String::from_utf8(decoded).ok()
375}
376
377impl NotificationFieldDraft {
378    fn push(&mut self, payload: String, encoded: bool) {
379        self.fragments
380            .push(NotificationFragment { payload, encoded });
381    }
382
383    fn into_value(self) -> Option<String> {
384        let mut combined = String::new();
385        let mut pending = String::new();
386        let mut pending_encoded = None;
387
388        for fragment in self.fragments {
389            match pending_encoded {
390                Some(current_encoded) if current_encoded == fragment.encoded => {
391                    pending.push_str(&fragment.payload);
392                }
393                Some(current_encoded) => {
394                    combined.push_str(&decode_notification_payload(current_encoded, &pending)?);
395                    pending = fragment.payload;
396                    pending_encoded = Some(fragment.encoded);
397                }
398                None => {
399                    pending = fragment.payload;
400                    pending_encoded = Some(fragment.encoded);
401                }
402            }
403        }
404
405        if let Some(current_encoded) = pending_encoded {
406            combined.push_str(&decode_notification_payload(current_encoded, &pending)?);
407        }
408
409        Some(combined).filter(|value| !value.is_empty())
410    }
411}
412
413fn decode_notification_payload(encoded: bool, payload: &str) -> Option<String> {
414    if encoded {
415        decode_base64(payload)
416    } else {
417        Some(payload.to_string())
418    }
419}
420
421fn percent_decode(value: &str) -> Option<String> {
422    let mut bytes = Vec::with_capacity(value.len());
423    let raw = value.as_bytes();
424    let mut index = 0usize;
425
426    while index < raw.len() {
427        match raw[index] {
428            b'%' if index + 2 < raw.len() => {
429                let high = decode_hex(raw[index + 1])?;
430                let low = decode_hex(raw[index + 2])?;
431                bytes.push((high << 4) | low);
432                index += 3;
433            }
434            byte => {
435                bytes.push(byte);
436                index += 1;
437            }
438        }
439    }
440
441    String::from_utf8(bytes).ok()
442}
443
444fn decode_hex(byte: u8) -> Option<u8> {
445    match byte {
446        b'0'..=b'9' => Some(byte - b'0'),
447        b'a'..=b'f' => Some(byte - b'a' + 10),
448        b'A'..=b'F' => Some(byte - b'A' + 10),
449        _ => None,
450    }
451}
452
453fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
454    if let Some(end) = remainder.find(BEL) {
455        return Some((&remainder[..end], end + BEL.len_utf8()));
456    }
457    if let Some(end) = remainder.find(ST) {
458        return Some((&remainder[..end], end + ST.len()));
459    }
460    None
461}
462
463fn floor_char_boundary(value: &str, mut index: usize) -> usize {
464    index = index.min(value.len());
465    while index > 0 && !value.is_char_boundary(index) {
466        index -= 1;
467    }
468    index
469}
470
471#[cfg(test)]
472mod tests {
473    use base64::{Engine as _, engine::general_purpose::STANDARD};
474    use taskers_domain::SignalKind;
475
476    use super::{
477        ParsedTerminalEvent, SignalStreamParser, parse_signal_frames, parse_terminal_events,
478    };
479
480    #[test]
481    fn parses_multiple_frames_with_different_terminators() {
482        let output = concat!(
483            "hello",
484            "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
485            "world",
486            "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
487        );
488
489        let frames = parse_signal_frames(output);
490
491        assert_eq!(frames.len(), 2);
492        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
493        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
494        assert_eq!(frames[1].kind, SignalKind::Completed);
495    }
496
497    #[test]
498    fn ignores_unknown_frames() {
499        let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
500        assert!(parse_signal_frames(output).is_empty());
501    }
502
503    #[test]
504    fn signal_parser_ignores_notification_only_frames() {
505        let output = "\u{1b}]777;notify;Taskers;Body\u{7}";
506        assert!(parse_signal_frames(output).is_empty());
507    }
508
509    #[test]
510    fn stream_parser_handles_split_frames() {
511        let mut parser = SignalStreamParser::default();
512
513        assert!(
514            parser
515                .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
516                .is_empty()
517        );
518
519        let frames = parser.push("%20approval\u{7}");
520        assert_eq!(frames.len(), 1);
521        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
522        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
523    }
524
525    #[test]
526    fn stream_parser_keeps_partial_prefix_on_utf8_boundary() {
527        let mut parser = SignalStreamParser::default();
528        let noisy_prefix = "abbr'...\n⠙ ";
529        let partial = format!("{noisy_prefix}\u{1b}]777;taskers;kind=progress;message=Working");
530
531        assert!(parser.push(&partial).is_empty());
532
533        let frames = parser.push("\u{7}");
534        assert_eq!(frames.len(), 1);
535        assert_eq!(frames[0].kind, SignalKind::Progress);
536        assert_eq!(frames[0].message.as_deref(), Some("Working"));
537    }
538
539    #[test]
540    fn parses_metadata_snapshots_with_base64_fields() {
541        let output = format!(
542            "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
543            STANDARD.encode("/home/notes/Projects/taskers"),
544            STANDARD.encode("taskers"),
545            STANDARD.encode("main"),
546            STANDARD.encode("codex"),
547            STANDARD.encode("codex · taskers"),
548        );
549
550        let frames = parse_signal_frames(&output);
551
552        assert_eq!(frames.len(), 1);
553        assert_eq!(frames[0].kind, SignalKind::Metadata);
554        let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
555        assert_eq!(
556            metadata.cwd.as_deref(),
557            Some("/home/notes/Projects/taskers")
558        );
559        assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
560        assert_eq!(metadata.git_branch.as_deref(), Some("main"));
561        assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
562        assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
563        assert_eq!(metadata.ports, vec![3000, 8080]);
564    }
565
566    #[test]
567    fn parses_rxvt_notification_frames() {
568        let frames = parse_terminal_events("\u{1b}]777;notify;OSC777 Title;OSC777 Body\u{7}");
569
570        assert_eq!(
571            frames,
572            vec![ParsedTerminalEvent::Notification(
573                super::ParsedNotification {
574                    title: Some("OSC777 Title".into()),
575                    subtitle: None,
576                    body: Some("OSC777 Body".into()),
577                    external_id: None,
578                }
579            )]
580        );
581    }
582
583    #[test]
584    fn parses_simple_kitty_notification_frames() {
585        let frames = parse_terminal_events("\u{1b}]99;;Kitty Simple\u{1b}\\");
586
587        assert_eq!(
588            frames,
589            vec![ParsedTerminalEvent::Notification(
590                super::ParsedNotification {
591                    title: Some("Kitty Simple".into()),
592                    subtitle: None,
593                    body: None,
594                    external_id: None,
595                }
596            )]
597        );
598    }
599
600    #[test]
601    fn parses_chunked_kitty_notification_frames() {
602        let mut parser = SignalStreamParser::default();
603
604        assert!(
605            parser
606                .push_events("\u{1b}]99;i=kitty:d=0:p=title;Kitty Title\u{1b}\\")
607                .is_empty()
608        );
609
610        let frames = parser.push_events("\u{1b}]99;i=kitty:p=body;Kitty Body\u{1b}\\");
611        assert_eq!(
612            frames,
613            vec![ParsedTerminalEvent::Notification(
614                super::ParsedNotification {
615                    title: Some("Kitty Title".into()),
616                    subtitle: None,
617                    body: Some("Kitty Body".into()),
618                    external_id: Some("kitty".into()),
619                }
620            )]
621        );
622    }
623
624    #[test]
625    fn defers_title_first_chunked_kitty_notification_frames_without_part() {
626        let mut parser = SignalStreamParser::default();
627
628        assert!(
629            parser
630                .push_events("\u{1b}]99;i=kitty;d=0:Kitty Title \u{1b}\\")
631                .is_empty()
632        );
633
634        let frames = parser.push_events("\u{1b}]99;i=kitty;p=body;e=1:Qm9keQ\u{1b}\\");
635        assert_eq!(
636            frames,
637            vec![ParsedTerminalEvent::Notification(
638                super::ParsedNotification {
639                    title: Some("Kitty Title ".into()),
640                    subtitle: None,
641                    body: Some("Body".into()),
642                    external_id: Some("kitty".into()),
643                }
644            )]
645        );
646    }
647
648    #[test]
649    fn parses_encoded_kitty_notification_payloads() {
650        let frames = parse_terminal_events("\u{1b}]99;i=1;e=1:SGVsbG8gV29ybGQ\u{1b}\\");
651
652        assert_eq!(
653            frames,
654            vec![ParsedTerminalEvent::Notification(
655                super::ParsedNotification {
656                    title: Some("Hello World".into()),
657                    subtitle: None,
658                    body: None,
659                    external_id: Some("1".into()),
660                }
661            )]
662        );
663    }
664
665    #[test]
666    fn concatenates_encoded_kitty_notification_chunks_before_decoding() {
667        let mut parser = SignalStreamParser::default();
668
669        assert!(
670            parser
671                .push_events("\u{1b}]99;i=kitty;e=1;d=0:SGVsbG8g\u{1b}\\")
672                .is_empty()
673        );
674
675        let frames = parser.push_events("\u{1b}]99;i=kitty;e=1:V29ybGQ\u{1b}\\");
676        assert_eq!(
677            frames,
678            vec![ParsedTerminalEvent::Notification(
679                super::ParsedNotification {
680                    title: Some("Hello World".into()),
681                    subtitle: None,
682                    body: None,
683                    external_id: Some("kitty".into()),
684                }
685            )]
686        );
687    }
688}