1use std::collections::HashMap;
2
3use base64::{Engine as _, engine::general_purpose::STANDARD};
4use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
5
6const OSC_PREFIX: &str = "\u{1b}]";
7const BEL: char = '\u{7}';
8const ST: &str = "\u{1b}\\";
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct ParsedSignal {
12 pub kind: SignalKind,
13 pub message: Option<String>,
14 pub metadata: Option<SignalPaneMetadata>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct ParsedNotification {
19 pub title: Option<String>,
20 pub subtitle: Option<String>,
21 pub body: Option<String>,
22 pub external_id: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ParsedTerminalEvent {
27 Signal(ParsedSignal),
28 Notification(ParsedNotification),
29}
30
31#[derive(Debug, Default, Clone)]
32pub struct SignalStreamParser {
33 pending: String,
34 kitty_notification_drafts: HashMap<String, NotificationDraft>,
35}
36
37impl ParsedSignal {
38 pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
39 SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
40 }
41}
42
43#[derive(Debug, Default, Clone)]
44struct NotificationDraft {
45 title: Option<String>,
46 subtitle: Option<String>,
47 body: Option<String>,
48}
49
50pub fn parse_terminal_events(buffer: &str) -> Vec<ParsedTerminalEvent> {
51 let mut parser = SignalStreamParser::default();
52 parser.push_events(buffer)
53}
54
55pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
56 let mut parser = SignalStreamParser::default();
57 parser.push(buffer)
58}
59
60impl SignalStreamParser {
61 pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
62 self.push_events(chunk)
63 .into_iter()
64 .filter_map(|event| match event {
65 ParsedTerminalEvent::Signal(signal) => Some(signal),
66 ParsedTerminalEvent::Notification(_) => None,
67 })
68 .collect()
69 }
70
71 pub fn push_events(&mut self, chunk: &str) -> Vec<ParsedTerminalEvent> {
72 self.pending.push_str(chunk);
73
74 let mut events = Vec::new();
75 let mut cursor = 0usize;
76 let mut keep_from = floor_char_boundary(
77 &self.pending,
78 self.pending.len().saturating_sub(OSC_PREFIX.len()),
79 );
80
81 while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
82 let frame_start = cursor + found;
83 let content_start = frame_start + OSC_PREFIX.len();
84 let remainder = &self.pending[content_start..];
85
86 let Some((raw_frame, consumed)) = frame_slice(remainder) else {
87 keep_from = frame_start;
88 break;
89 };
90
91 let raw_frame = raw_frame.to_string();
92 if let Some(parsed) = self.parse_frame(&raw_frame) {
93 events.push(parsed);
94 }
95
96 cursor = content_start + consumed;
97 keep_from = cursor;
98 }
99
100 self.pending = self.pending[floor_char_boundary(&self.pending, keep_from)..].to_string();
101 events
102 }
103
104 fn parse_frame(&mut self, frame: &str) -> Option<ParsedTerminalEvent> {
105 if let Some(frame) = frame.strip_prefix("777;taskers;") {
106 return parse_taskers_frame(frame).map(ParsedTerminalEvent::Signal);
107 }
108 if let Some(frame) = frame.strip_prefix("777;notify;") {
109 return parse_rxvt_notification(frame).map(ParsedTerminalEvent::Notification);
110 }
111 if let Some(frame) = frame.strip_prefix("99;") {
112 return self
113 .parse_kitty_notification(frame)
114 .map(ParsedTerminalEvent::Notification);
115 }
116 None
117 }
118}
119
120fn parse_taskers_frame(frame: &str) -> Option<ParsedSignal> {
121 let mut kind = None;
122 let mut message = None;
123 let mut title = None;
124 let mut cwd = None;
125 let mut repo_name = None;
126 let mut git_branch = None;
127 let mut agent_kind = None;
128 let mut agent_active = None;
129 let mut ports = None;
130
131 for part in frame.split(';') {
132 let (key, value) = part.split_once('=')?;
133 match key {
134 "kind" => {
135 kind = Some(match value {
136 "metadata" => SignalKind::Metadata,
137 "started" => SignalKind::Started,
138 "progress" => SignalKind::Progress,
139 "completed" => SignalKind::Completed,
140 "waiting_input" => SignalKind::WaitingInput,
141 "error" => SignalKind::Error,
142 "notification" => SignalKind::Notification,
143 _ => return None,
144 });
145 }
146 "message" => message = percent_decode(value),
147 "message_b64" => message = decode_base64(value),
148 "title" => title = percent_decode(value),
149 "title_b64" => title = decode_base64(value),
150 "cwd" => cwd = percent_decode(value),
151 "cwd_b64" => cwd = decode_base64(value),
152 "repo" | "repo_name" => repo_name = percent_decode(value),
153 "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
154 "branch" | "git_branch" => git_branch = percent_decode(value),
155 "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
156 "agent" | "agent_kind" => agent_kind = percent_decode(value),
157 "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
158 "agent_active" => agent_active = parse_bool(value),
159 "agent_active_b64" => {
160 agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
161 }
162 "ports" => ports = parse_ports(value),
163 "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
164 _ => {}
165 }
166 }
167
168 let metadata = if title.is_some()
169 || cwd.is_some()
170 || repo_name.is_some()
171 || git_branch.is_some()
172 || agent_kind.is_some()
173 || agent_active.is_some()
174 || ports.is_some()
175 {
176 Some(SignalPaneMetadata {
177 title,
178 agent_title: None,
179 cwd,
180 repo_name,
181 git_branch,
182 ports: ports.unwrap_or_default(),
183 agent_kind,
184 agent_active,
185 })
186 } else {
187 None
188 };
189
190 Some(ParsedSignal {
191 kind: kind?,
192 message,
193 metadata,
194 })
195}
196
197fn parse_rxvt_notification(frame: &str) -> Option<ParsedNotification> {
198 let (title, body) = match frame.split_once(';') {
199 Some((title, body)) => (title, Some(body)),
200 None => (frame, None),
201 };
202
203 let title = Some(title.to_string()).filter(|value| !value.is_empty());
204 let body = body.map(str::to_string).filter(|value| !value.is_empty());
205
206 if title.is_none() && body.is_none() {
207 return None;
208 }
209
210 Some(ParsedNotification {
211 title,
212 subtitle: None,
213 body,
214 external_id: None,
215 })
216}
217
218impl SignalStreamParser {
219 fn parse_kitty_notification(&mut self, frame: &str) -> Option<ParsedNotification> {
220 let (param_tokens, payload) = split_kitty_params_and_payload(frame);
221 let mut external_id = None;
222 let mut part = None;
223 let mut done = None;
224
225 for token in param_tokens {
226 let (key, value) = token.split_once('=')?;
227 match key {
228 "i" => {
229 external_id = Some(value.to_string()).filter(|value| !value.is_empty());
230 }
231 "p" => {
232 part = Some(value.to_ascii_lowercase());
233 }
234 "d" => {
235 done = match value {
236 "0" => Some(false),
237 "1" => Some(true),
238 _ => None,
239 };
240 }
241 "e" => {}
242 _ => {}
243 }
244 }
245
246 let mut draft = external_id
247 .as_ref()
248 .and_then(|id| self.kitty_notification_drafts.remove(id))
249 .unwrap_or_default();
250
251 let payload = Some(payload.to_string()).filter(|value| !value.is_empty());
252 match part.as_deref() {
253 Some("title") => draft.title = payload,
254 Some("subtitle") => draft.subtitle = payload,
255 Some("body") => draft.body = payload,
256 Some(_) => {}
257 None => {
258 if let Some(payload) = payload {
259 if draft.title.is_none() {
260 draft.title = Some(payload);
261 } else {
262 draft.body = Some(payload);
263 }
264 }
265 }
266 }
267
268 let should_defer = matches!(done, Some(false)) && part.is_some();
269 if should_defer {
270 if let Some(external_id) = external_id {
271 self.kitty_notification_drafts.insert(external_id, draft);
272 }
273 return None;
274 }
275
276 if draft.title.is_none() && draft.subtitle.is_none() && draft.body.is_none() {
277 return None;
278 }
279
280 Some(ParsedNotification {
281 title: draft.title,
282 subtitle: draft.subtitle,
283 body: draft.body,
284 external_id,
285 })
286 }
287}
288
289fn split_kitty_params_and_payload(frame: &str) -> (Vec<&str>, &str) {
290 let mut params = Vec::new();
291 let mut start = 0usize;
292
293 if let Some(stripped) = frame.strip_prefix([';', ':']) {
294 return (params, stripped);
295 }
296
297 while start < frame.len() {
298 let remainder = &frame[start..];
299 let Some(separator) = remainder.find([';', ':']) else {
300 if is_kitty_param_token(remainder) {
301 params.push(remainder);
302 return (params, "");
303 }
304 return (params, remainder);
305 };
306
307 let token_end = start + separator;
308 let token = &frame[start..token_end];
309 if !is_kitty_param_token(token) {
310 return (params, &frame[start..]);
311 }
312
313 params.push(token);
314 start = token_end + 1;
315 }
316
317 (params, "")
318}
319
320fn is_kitty_param_token(token: &str) -> bool {
321 token
322 .split_once('=')
323 .is_some_and(|(key, _)| !key.is_empty())
324}
325
326fn parse_ports(value: &str) -> Option<Vec<u16>> {
327 if value.is_empty() {
328 return Some(Vec::new());
329 }
330
331 value
332 .split(',')
333 .map(|part| part.parse::<u16>().ok())
334 .collect::<Option<Vec<_>>>()
335}
336
337fn parse_bool(value: &str) -> Option<bool> {
338 match value.trim().to_ascii_lowercase().as_str() {
339 "1" | "true" | "yes" | "on" => Some(true),
340 "0" | "false" | "no" | "off" => Some(false),
341 _ => None,
342 }
343}
344
345fn decode_base64(value: &str) -> Option<String> {
346 let decoded = STANDARD.decode(value).ok()?;
347 String::from_utf8(decoded).ok()
348}
349
350fn percent_decode(value: &str) -> Option<String> {
351 let mut bytes = Vec::with_capacity(value.len());
352 let raw = value.as_bytes();
353 let mut index = 0usize;
354
355 while index < raw.len() {
356 match raw[index] {
357 b'%' if index + 2 < raw.len() => {
358 let high = decode_hex(raw[index + 1])?;
359 let low = decode_hex(raw[index + 2])?;
360 bytes.push((high << 4) | low);
361 index += 3;
362 }
363 byte => {
364 bytes.push(byte);
365 index += 1;
366 }
367 }
368 }
369
370 String::from_utf8(bytes).ok()
371}
372
373fn decode_hex(byte: u8) -> Option<u8> {
374 match byte {
375 b'0'..=b'9' => Some(byte - b'0'),
376 b'a'..=b'f' => Some(byte - b'a' + 10),
377 b'A'..=b'F' => Some(byte - b'A' + 10),
378 _ => None,
379 }
380}
381
382fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
383 if let Some(end) = remainder.find(BEL) {
384 return Some((&remainder[..end], end + BEL.len_utf8()));
385 }
386 if let Some(end) = remainder.find(ST) {
387 return Some((&remainder[..end], end + ST.len()));
388 }
389 None
390}
391
392fn floor_char_boundary(value: &str, mut index: usize) -> usize {
393 index = index.min(value.len());
394 while index > 0 && !value.is_char_boundary(index) {
395 index -= 1;
396 }
397 index
398}
399
400#[cfg(test)]
401mod tests {
402 use base64::{Engine as _, engine::general_purpose::STANDARD};
403 use taskers_domain::SignalKind;
404
405 use super::{
406 ParsedTerminalEvent, SignalStreamParser, parse_signal_frames, parse_terminal_events,
407 };
408
409 #[test]
410 fn parses_multiple_frames_with_different_terminators() {
411 let output = concat!(
412 "hello",
413 "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
414 "world",
415 "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
416 );
417
418 let frames = parse_signal_frames(output);
419
420 assert_eq!(frames.len(), 2);
421 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
422 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
423 assert_eq!(frames[1].kind, SignalKind::Completed);
424 }
425
426 #[test]
427 fn ignores_unknown_frames() {
428 let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
429 assert!(parse_signal_frames(output).is_empty());
430 }
431
432 #[test]
433 fn signal_parser_ignores_notification_only_frames() {
434 let output = "\u{1b}]777;notify;Taskers;Body\u{7}";
435 assert!(parse_signal_frames(output).is_empty());
436 }
437
438 #[test]
439 fn stream_parser_handles_split_frames() {
440 let mut parser = SignalStreamParser::default();
441
442 assert!(
443 parser
444 .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
445 .is_empty()
446 );
447
448 let frames = parser.push("%20approval\u{7}");
449 assert_eq!(frames.len(), 1);
450 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
451 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
452 }
453
454 #[test]
455 fn stream_parser_keeps_partial_prefix_on_utf8_boundary() {
456 let mut parser = SignalStreamParser::default();
457 let noisy_prefix = "abbr'...\n⠙ ";
458 let partial = format!("{noisy_prefix}\u{1b}]777;taskers;kind=progress;message=Working");
459
460 assert!(parser.push(&partial).is_empty());
461
462 let frames = parser.push("\u{7}");
463 assert_eq!(frames.len(), 1);
464 assert_eq!(frames[0].kind, SignalKind::Progress);
465 assert_eq!(frames[0].message.as_deref(), Some("Working"));
466 }
467
468 #[test]
469 fn parses_metadata_snapshots_with_base64_fields() {
470 let output = format!(
471 "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
472 STANDARD.encode("/home/notes/Projects/taskers"),
473 STANDARD.encode("taskers"),
474 STANDARD.encode("main"),
475 STANDARD.encode("codex"),
476 STANDARD.encode("codex · taskers"),
477 );
478
479 let frames = parse_signal_frames(&output);
480
481 assert_eq!(frames.len(), 1);
482 assert_eq!(frames[0].kind, SignalKind::Metadata);
483 let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
484 assert_eq!(
485 metadata.cwd.as_deref(),
486 Some("/home/notes/Projects/taskers")
487 );
488 assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
489 assert_eq!(metadata.git_branch.as_deref(), Some("main"));
490 assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
491 assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
492 assert_eq!(metadata.ports, vec![3000, 8080]);
493 }
494
495 #[test]
496 fn parses_rxvt_notification_frames() {
497 let frames = parse_terminal_events("\u{1b}]777;notify;OSC777 Title;OSC777 Body\u{7}");
498
499 assert_eq!(
500 frames,
501 vec![ParsedTerminalEvent::Notification(
502 super::ParsedNotification {
503 title: Some("OSC777 Title".into()),
504 subtitle: None,
505 body: Some("OSC777 Body".into()),
506 external_id: None,
507 }
508 )]
509 );
510 }
511
512 #[test]
513 fn parses_simple_kitty_notification_frames() {
514 let frames = parse_terminal_events("\u{1b}]99;;Kitty Simple\u{1b}\\");
515
516 assert_eq!(
517 frames,
518 vec![ParsedTerminalEvent::Notification(
519 super::ParsedNotification {
520 title: Some("Kitty Simple".into()),
521 subtitle: None,
522 body: None,
523 external_id: None,
524 }
525 )]
526 );
527 }
528
529 #[test]
530 fn parses_chunked_kitty_notification_frames() {
531 let mut parser = SignalStreamParser::default();
532
533 assert!(
534 parser
535 .push_events("\u{1b}]99;i=kitty:d=0:p=title;Kitty Title\u{1b}\\")
536 .is_empty()
537 );
538
539 let frames = parser.push_events("\u{1b}]99;i=kitty:p=body;Kitty Body\u{1b}\\");
540 assert_eq!(
541 frames,
542 vec![ParsedTerminalEvent::Notification(
543 super::ParsedNotification {
544 title: Some("Kitty Title".into()),
545 subtitle: None,
546 body: Some("Kitty Body".into()),
547 external_id: Some("kitty".into()),
548 }
549 )]
550 );
551 }
552
553 #[test]
554 fn parses_doc_style_kitty_notification_payloads() {
555 let frames = parse_terminal_events("\u{1b}]99;i=1;e=1;d=0:Hello World\u{1b}\\");
556
557 assert_eq!(
558 frames,
559 vec![ParsedTerminalEvent::Notification(
560 super::ParsedNotification {
561 title: Some("Hello World".into()),
562 subtitle: None,
563 body: None,
564 external_id: Some("1".into()),
565 }
566 )]
567 );
568 }
569}