1use std::collections::HashMap;
2
3use base64::{Engine as _, engine::general_purpose::STANDARD};
4use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
5
6const OSC_PREFIX: &str = "\u{1b}]";
7const BEL: char = '\u{7}';
8const ST: &str = "\u{1b}\\";
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct ParsedSignal {
12 pub kind: SignalKind,
13 pub message: Option<String>,
14 pub metadata: Option<SignalPaneMetadata>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct ParsedNotification {
19 pub title: Option<String>,
20 pub subtitle: Option<String>,
21 pub body: Option<String>,
22 pub external_id: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ParsedTerminalEvent {
27 Signal(ParsedSignal),
28 Notification(ParsedNotification),
29}
30
31#[derive(Debug, Default, Clone)]
32pub struct SignalStreamParser {
33 pending: String,
34 kitty_notification_drafts: HashMap<String, NotificationDraft>,
35}
36
37impl ParsedSignal {
38 pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
39 SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
40 }
41}
42
43#[derive(Debug, Default, Clone)]
44struct NotificationDraft {
45 title: NotificationFieldDraft,
46 subtitle: NotificationFieldDraft,
47 body: NotificationFieldDraft,
48}
49
50#[derive(Debug, Default, Clone)]
51struct NotificationFieldDraft {
52 fragments: Vec<NotificationFragment>,
53}
54
55#[derive(Debug, Clone)]
56struct NotificationFragment {
57 payload: String,
58 encoded: bool,
59}
60
61pub fn parse_terminal_events(buffer: &str) -> Vec<ParsedTerminalEvent> {
62 let mut parser = SignalStreamParser::default();
63 parser.push_events(buffer)
64}
65
66pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
67 let mut parser = SignalStreamParser::default();
68 parser.push(buffer)
69}
70
71impl SignalStreamParser {
72 pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
73 self.push_events(chunk)
74 .into_iter()
75 .filter_map(|event| match event {
76 ParsedTerminalEvent::Signal(signal) => Some(signal),
77 ParsedTerminalEvent::Notification(_) => None,
78 })
79 .collect()
80 }
81
82 pub fn push_events(&mut self, chunk: &str) -> Vec<ParsedTerminalEvent> {
83 self.pending.push_str(chunk);
84
85 let mut events = Vec::new();
86 let mut cursor = 0usize;
87 let mut keep_from = floor_char_boundary(
88 &self.pending,
89 self.pending.len().saturating_sub(OSC_PREFIX.len()),
90 );
91
92 while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
93 let frame_start = cursor + found;
94 let content_start = frame_start + OSC_PREFIX.len();
95 let remainder = &self.pending[content_start..];
96
97 let Some((raw_frame, consumed)) = frame_slice(remainder) else {
98 keep_from = frame_start;
99 break;
100 };
101
102 let raw_frame = raw_frame.to_string();
103 if let Some(parsed) = self.parse_frame(&raw_frame) {
104 events.push(parsed);
105 }
106
107 cursor = content_start + consumed;
108 keep_from = cursor;
109 }
110
111 self.pending = self.pending[floor_char_boundary(&self.pending, keep_from)..].to_string();
112 events
113 }
114
115 fn parse_frame(&mut self, frame: &str) -> Option<ParsedTerminalEvent> {
116 if let Some(frame) = frame.strip_prefix("777;taskers;") {
117 return parse_taskers_frame(frame).map(ParsedTerminalEvent::Signal);
118 }
119 if let Some(frame) = frame.strip_prefix("777;notify;") {
120 return parse_rxvt_notification(frame).map(ParsedTerminalEvent::Notification);
121 }
122 if let Some(frame) = frame.strip_prefix("99;") {
123 return self
124 .parse_kitty_notification(frame)
125 .map(ParsedTerminalEvent::Notification);
126 }
127 None
128 }
129}
130
131fn parse_taskers_frame(frame: &str) -> Option<ParsedSignal> {
132 let mut kind = None;
133 let mut message = None;
134 let mut title = None;
135 let mut cwd = None;
136 let mut repo_name = None;
137 let mut git_branch = None;
138 let mut agent_kind = None;
139 let mut agent_active = None;
140 let mut ports = None;
141
142 for part in frame.split(';') {
143 let (key, value) = part.split_once('=')?;
144 match key {
145 "kind" => {
146 kind = Some(match value {
147 "metadata" => SignalKind::Metadata,
148 "started" => SignalKind::Started,
149 "progress" => SignalKind::Progress,
150 "completed" => SignalKind::Completed,
151 "waiting_input" => SignalKind::WaitingInput,
152 "error" => SignalKind::Error,
153 "notification" => SignalKind::Notification,
154 _ => return None,
155 });
156 }
157 "message" => message = percent_decode(value),
158 "message_b64" => message = decode_base64(value),
159 "title" => title = percent_decode(value),
160 "title_b64" => title = decode_base64(value),
161 "cwd" => cwd = percent_decode(value),
162 "cwd_b64" => cwd = decode_base64(value),
163 "repo" | "repo_name" => repo_name = percent_decode(value),
164 "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
165 "branch" | "git_branch" => git_branch = percent_decode(value),
166 "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
167 "agent" | "agent_kind" => agent_kind = percent_decode(value),
168 "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
169 "agent_active" => agent_active = parse_bool(value),
170 "agent_active_b64" => {
171 agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
172 }
173 "ports" => ports = parse_ports(value),
174 "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
175 _ => {}
176 }
177 }
178
179 let metadata = if title.is_some()
180 || cwd.is_some()
181 || repo_name.is_some()
182 || git_branch.is_some()
183 || agent_kind.is_some()
184 || agent_active.is_some()
185 || ports.is_some()
186 {
187 Some(SignalPaneMetadata {
188 title,
189 agent_title: None,
190 cwd,
191 repo_name,
192 git_branch,
193 ports: ports.unwrap_or_default(),
194 agent_kind,
195 agent_active,
196 agent_command: None,
197 })
198 } else {
199 None
200 };
201
202 Some(ParsedSignal {
203 kind: kind?,
204 message,
205 metadata,
206 })
207}
208
209fn parse_rxvt_notification(frame: &str) -> Option<ParsedNotification> {
210 let (title, body) = match frame.split_once(';') {
211 Some((title, body)) => (title, Some(body)),
212 None => (frame, None),
213 };
214
215 let title = Some(title.to_string()).filter(|value| !value.is_empty());
216 let body = body.map(str::to_string).filter(|value| !value.is_empty());
217
218 if title.is_none() && body.is_none() {
219 return None;
220 }
221
222 Some(ParsedNotification {
223 title,
224 subtitle: None,
225 body,
226 external_id: None,
227 })
228}
229
230impl SignalStreamParser {
231 fn parse_kitty_notification(&mut self, frame: &str) -> Option<ParsedNotification> {
232 let (param_tokens, payload) = split_kitty_params_and_payload(frame);
233 let mut external_id = None;
234 let mut part = None;
235 let mut done = None;
236 let mut encoded = false;
237
238 for token in param_tokens {
239 let (key, value) = token.split_once('=')?;
240 match key {
241 "i" => {
242 external_id = Some(value.to_string()).filter(|value| !value.is_empty());
243 }
244 "p" => {
245 part = Some(value.to_ascii_lowercase());
246 }
247 "d" => {
248 done = match value {
249 "0" => Some(false),
250 "1" => Some(true),
251 _ => None,
252 };
253 }
254 "e" => {
255 encoded = value == "1";
256 }
257 _ => {}
258 }
259 }
260
261 let mut draft = external_id
262 .as_ref()
263 .and_then(|id| self.kitty_notification_drafts.remove(id))
264 .unwrap_or_default();
265
266 let payload = Some(payload.to_string()).filter(|value| !value.is_empty());
267 match part.as_deref() {
268 Some("title") | None => {
269 if let Some(payload) = payload {
270 draft.title.push(payload, encoded);
271 }
272 }
273 Some("subtitle") => {
274 if let Some(payload) = payload {
275 draft.subtitle.push(payload, encoded);
276 }
277 }
278 Some("body") => {
279 if let Some(payload) = payload {
280 draft.body.push(payload, encoded);
281 }
282 }
283 Some(_) => {}
284 }
285
286 let should_defer = matches!(done, Some(false));
287 if should_defer {
288 if let Some(external_id) = external_id {
289 self.kitty_notification_drafts.insert(external_id, draft);
290 }
291 return None;
292 }
293
294 let title = draft.title.into_value();
295 let subtitle = draft.subtitle.into_value();
296 let body = draft.body.into_value();
297
298 if title.is_none() && subtitle.is_none() && body.is_none() {
299 return None;
300 }
301
302 Some(ParsedNotification {
303 title,
304 subtitle,
305 body,
306 external_id,
307 })
308 }
309}
310
311fn split_kitty_params_and_payload(frame: &str) -> (Vec<&str>, &str) {
312 let mut params = Vec::new();
313 let mut start = 0usize;
314
315 if let Some(stripped) = frame.strip_prefix([';', ':']) {
316 return (params, stripped);
317 }
318
319 while start < frame.len() {
320 let remainder = &frame[start..];
321 let Some(separator) = remainder.find([';', ':']) else {
322 if is_kitty_param_token(remainder) {
323 params.push(remainder);
324 return (params, "");
325 }
326 return (params, remainder);
327 };
328
329 let token_end = start + separator;
330 let token = &frame[start..token_end];
331 if !is_kitty_param_token(token) {
332 return (params, &frame[start..]);
333 }
334
335 params.push(token);
336 start = token_end + 1;
337 }
338
339 (params, "")
340}
341
342fn is_kitty_param_token(token: &str) -> bool {
343 token
344 .split_once('=')
345 .is_some_and(|(key, _)| !key.is_empty())
346}
347
348fn parse_ports(value: &str) -> Option<Vec<u16>> {
349 if value.is_empty() {
350 return Some(Vec::new());
351 }
352
353 value
354 .split(',')
355 .map(|part| part.parse::<u16>().ok())
356 .collect::<Option<Vec<_>>>()
357}
358
359fn parse_bool(value: &str) -> Option<bool> {
360 match value.trim().to_ascii_lowercase().as_str() {
361 "1" | "true" | "yes" | "on" => Some(true),
362 "0" | "false" | "no" | "off" => Some(false),
363 _ => None,
364 }
365}
366
367fn decode_base64(value: &str) -> Option<String> {
368 let mut normalized = value.to_string();
369 let missing_padding = normalized.len() % 4;
370 if missing_padding != 0 {
371 normalized.extend(std::iter::repeat_n('=', 4 - missing_padding));
372 }
373 let decoded = STANDARD.decode(normalized).ok()?;
374 String::from_utf8(decoded).ok()
375}
376
377impl NotificationFieldDraft {
378 fn push(&mut self, payload: String, encoded: bool) {
379 self.fragments
380 .push(NotificationFragment { payload, encoded });
381 }
382
383 fn into_value(self) -> Option<String> {
384 let mut combined = String::new();
385 let mut pending = String::new();
386 let mut pending_encoded = None;
387
388 for fragment in self.fragments {
389 match pending_encoded {
390 Some(current_encoded) if current_encoded == fragment.encoded => {
391 pending.push_str(&fragment.payload);
392 }
393 Some(current_encoded) => {
394 combined.push_str(&decode_notification_payload(current_encoded, &pending)?);
395 pending = fragment.payload;
396 pending_encoded = Some(fragment.encoded);
397 }
398 None => {
399 pending = fragment.payload;
400 pending_encoded = Some(fragment.encoded);
401 }
402 }
403 }
404
405 if let Some(current_encoded) = pending_encoded {
406 combined.push_str(&decode_notification_payload(current_encoded, &pending)?);
407 }
408
409 Some(combined).filter(|value| !value.is_empty())
410 }
411}
412
413fn decode_notification_payload(encoded: bool, payload: &str) -> Option<String> {
414 if encoded {
415 decode_base64(payload)
416 } else {
417 Some(payload.to_string())
418 }
419}
420
421fn percent_decode(value: &str) -> Option<String> {
422 let mut bytes = Vec::with_capacity(value.len());
423 let raw = value.as_bytes();
424 let mut index = 0usize;
425
426 while index < raw.len() {
427 match raw[index] {
428 b'%' if index + 2 < raw.len() => {
429 let high = decode_hex(raw[index + 1])?;
430 let low = decode_hex(raw[index + 2])?;
431 bytes.push((high << 4) | low);
432 index += 3;
433 }
434 byte => {
435 bytes.push(byte);
436 index += 1;
437 }
438 }
439 }
440
441 String::from_utf8(bytes).ok()
442}
443
444fn decode_hex(byte: u8) -> Option<u8> {
445 match byte {
446 b'0'..=b'9' => Some(byte - b'0'),
447 b'a'..=b'f' => Some(byte - b'a' + 10),
448 b'A'..=b'F' => Some(byte - b'A' + 10),
449 _ => None,
450 }
451}
452
453fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
454 if let Some(end) = remainder.find(BEL) {
455 return Some((&remainder[..end], end + BEL.len_utf8()));
456 }
457 if let Some(end) = remainder.find(ST) {
458 return Some((&remainder[..end], end + ST.len()));
459 }
460 None
461}
462
463fn floor_char_boundary(value: &str, mut index: usize) -> usize {
464 index = index.min(value.len());
465 while index > 0 && !value.is_char_boundary(index) {
466 index -= 1;
467 }
468 index
469}
470
471#[cfg(test)]
472mod tests {
473 use base64::{Engine as _, engine::general_purpose::STANDARD};
474 use taskers_domain::SignalKind;
475
476 use super::{
477 ParsedTerminalEvent, SignalStreamParser, parse_signal_frames, parse_terminal_events,
478 };
479
480 #[test]
481 fn parses_multiple_frames_with_different_terminators() {
482 let output = concat!(
483 "hello",
484 "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
485 "world",
486 "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
487 );
488
489 let frames = parse_signal_frames(output);
490
491 assert_eq!(frames.len(), 2);
492 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
493 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
494 assert_eq!(frames[1].kind, SignalKind::Completed);
495 }
496
497 #[test]
498 fn ignores_unknown_frames() {
499 let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
500 assert!(parse_signal_frames(output).is_empty());
501 }
502
503 #[test]
504 fn signal_parser_ignores_notification_only_frames() {
505 let output = "\u{1b}]777;notify;Taskers;Body\u{7}";
506 assert!(parse_signal_frames(output).is_empty());
507 }
508
509 #[test]
510 fn stream_parser_handles_split_frames() {
511 let mut parser = SignalStreamParser::default();
512
513 assert!(
514 parser
515 .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
516 .is_empty()
517 );
518
519 let frames = parser.push("%20approval\u{7}");
520 assert_eq!(frames.len(), 1);
521 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
522 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
523 }
524
525 #[test]
526 fn stream_parser_keeps_partial_prefix_on_utf8_boundary() {
527 let mut parser = SignalStreamParser::default();
528 let noisy_prefix = "abbr'...\n⠙ ";
529 let partial = format!("{noisy_prefix}\u{1b}]777;taskers;kind=progress;message=Working");
530
531 assert!(parser.push(&partial).is_empty());
532
533 let frames = parser.push("\u{7}");
534 assert_eq!(frames.len(), 1);
535 assert_eq!(frames[0].kind, SignalKind::Progress);
536 assert_eq!(frames[0].message.as_deref(), Some("Working"));
537 }
538
539 #[test]
540 fn parses_metadata_snapshots_with_base64_fields() {
541 let output = format!(
542 "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
543 STANDARD.encode("/home/notes/Projects/taskers"),
544 STANDARD.encode("taskers"),
545 STANDARD.encode("main"),
546 STANDARD.encode("codex"),
547 STANDARD.encode("codex · taskers"),
548 );
549
550 let frames = parse_signal_frames(&output);
551
552 assert_eq!(frames.len(), 1);
553 assert_eq!(frames[0].kind, SignalKind::Metadata);
554 let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
555 assert_eq!(
556 metadata.cwd.as_deref(),
557 Some("/home/notes/Projects/taskers")
558 );
559 assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
560 assert_eq!(metadata.git_branch.as_deref(), Some("main"));
561 assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
562 assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
563 assert_eq!(metadata.ports, vec![3000, 8080]);
564 }
565
566 #[test]
567 fn parses_rxvt_notification_frames() {
568 let frames = parse_terminal_events("\u{1b}]777;notify;OSC777 Title;OSC777 Body\u{7}");
569
570 assert_eq!(
571 frames,
572 vec![ParsedTerminalEvent::Notification(
573 super::ParsedNotification {
574 title: Some("OSC777 Title".into()),
575 subtitle: None,
576 body: Some("OSC777 Body".into()),
577 external_id: None,
578 }
579 )]
580 );
581 }
582
583 #[test]
584 fn parses_simple_kitty_notification_frames() {
585 let frames = parse_terminal_events("\u{1b}]99;;Kitty Simple\u{1b}\\");
586
587 assert_eq!(
588 frames,
589 vec![ParsedTerminalEvent::Notification(
590 super::ParsedNotification {
591 title: Some("Kitty Simple".into()),
592 subtitle: None,
593 body: None,
594 external_id: None,
595 }
596 )]
597 );
598 }
599
600 #[test]
601 fn parses_chunked_kitty_notification_frames() {
602 let mut parser = SignalStreamParser::default();
603
604 assert!(
605 parser
606 .push_events("\u{1b}]99;i=kitty:d=0:p=title;Kitty Title\u{1b}\\")
607 .is_empty()
608 );
609
610 let frames = parser.push_events("\u{1b}]99;i=kitty:p=body;Kitty Body\u{1b}\\");
611 assert_eq!(
612 frames,
613 vec![ParsedTerminalEvent::Notification(
614 super::ParsedNotification {
615 title: Some("Kitty Title".into()),
616 subtitle: None,
617 body: Some("Kitty Body".into()),
618 external_id: Some("kitty".into()),
619 }
620 )]
621 );
622 }
623
624 #[test]
625 fn defers_title_first_chunked_kitty_notification_frames_without_part() {
626 let mut parser = SignalStreamParser::default();
627
628 assert!(
629 parser
630 .push_events("\u{1b}]99;i=kitty;d=0:Kitty Title \u{1b}\\")
631 .is_empty()
632 );
633
634 let frames = parser.push_events("\u{1b}]99;i=kitty;p=body;e=1:Qm9keQ\u{1b}\\");
635 assert_eq!(
636 frames,
637 vec![ParsedTerminalEvent::Notification(
638 super::ParsedNotification {
639 title: Some("Kitty Title ".into()),
640 subtitle: None,
641 body: Some("Body".into()),
642 external_id: Some("kitty".into()),
643 }
644 )]
645 );
646 }
647
648 #[test]
649 fn parses_encoded_kitty_notification_payloads() {
650 let frames = parse_terminal_events("\u{1b}]99;i=1;e=1:SGVsbG8gV29ybGQ\u{1b}\\");
651
652 assert_eq!(
653 frames,
654 vec![ParsedTerminalEvent::Notification(
655 super::ParsedNotification {
656 title: Some("Hello World".into()),
657 subtitle: None,
658 body: None,
659 external_id: Some("1".into()),
660 }
661 )]
662 );
663 }
664
665 #[test]
666 fn concatenates_encoded_kitty_notification_chunks_before_decoding() {
667 let mut parser = SignalStreamParser::default();
668
669 assert!(
670 parser
671 .push_events("\u{1b}]99;i=kitty;e=1;d=0:SGVsbG8g\u{1b}\\")
672 .is_empty()
673 );
674
675 let frames = parser.push_events("\u{1b}]99;i=kitty;e=1:V29ybGQ\u{1b}\\");
676 assert_eq!(
677 frames,
678 vec![ParsedTerminalEvent::Notification(
679 super::ParsedNotification {
680 title: Some("Hello World".into()),
681 subtitle: None,
682 body: None,
683 external_id: Some("kitty".into()),
684 }
685 )]
686 );
687 }
688}