1use base64::{Engine as _, engine::general_purpose::STANDARD};
2use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
3
4const OSC_PREFIX: &str = "\u{1b}]777;taskers;";
5const BEL: char = '\u{7}';
6const ST: &str = "\u{1b}\\";
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct ParsedSignal {
10 pub kind: SignalKind,
11 pub message: Option<String>,
12 pub metadata: Option<SignalPaneMetadata>,
13}
14
15#[derive(Debug, Default, Clone)]
16pub struct SignalStreamParser {
17 pending: String,
18}
19
20impl ParsedSignal {
21 pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
22 SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
23 }
24}
25
26pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
27 let mut parser = SignalStreamParser::default();
28 parser.push(buffer)
29}
30
31impl SignalStreamParser {
32 pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
33 self.pending.push_str(chunk);
34
35 let mut frames = Vec::new();
36 let mut cursor = 0usize;
37 let mut keep_from = self.pending.len().saturating_sub(OSC_PREFIX.len());
38
39 while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
40 let frame_start = cursor + found;
41 let content_start = frame_start + OSC_PREFIX.len();
42 let remainder = &self.pending[content_start..];
43
44 let Some((raw_frame, consumed)) = frame_slice(remainder) else {
45 keep_from = frame_start;
46 break;
47 };
48
49 if let Some(parsed) = parse_frame(raw_frame) {
50 frames.push(parsed);
51 }
52
53 cursor = content_start + consumed;
54 keep_from = cursor;
55 }
56
57 self.pending = self.pending[keep_from..].to_string();
58 frames
59 }
60}
61
62fn parse_frame(frame: &str) -> Option<ParsedSignal> {
63 let mut kind = None;
64 let mut message = None;
65 let mut title = None;
66 let mut cwd = None;
67 let mut repo_name = None;
68 let mut git_branch = None;
69 let mut agent_kind = None;
70 let mut agent_active = None;
71 let mut ports = None;
72
73 for part in frame.split(';') {
74 let (key, value) = part.split_once('=')?;
75 match key {
76 "kind" => {
77 kind = Some(match value {
78 "metadata" => SignalKind::Metadata,
79 "started" => SignalKind::Started,
80 "progress" => SignalKind::Progress,
81 "completed" => SignalKind::Completed,
82 "waiting_input" => SignalKind::WaitingInput,
83 "error" => SignalKind::Error,
84 "notification" => SignalKind::Notification,
85 _ => return None,
86 });
87 }
88 "message" => message = percent_decode(value),
89 "message_b64" => message = decode_base64(value),
90 "title" => title = percent_decode(value),
91 "title_b64" => title = decode_base64(value),
92 "cwd" => cwd = percent_decode(value),
93 "cwd_b64" => cwd = decode_base64(value),
94 "repo" | "repo_name" => repo_name = percent_decode(value),
95 "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
96 "branch" | "git_branch" => git_branch = percent_decode(value),
97 "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
98 "agent" | "agent_kind" => agent_kind = percent_decode(value),
99 "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
100 "agent_active" => agent_active = parse_bool(value),
101 "agent_active_b64" => {
102 agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
103 }
104 "ports" => ports = parse_ports(value),
105 "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
106 _ => {}
107 }
108 }
109
110 let metadata = if title.is_some()
111 || cwd.is_some()
112 || repo_name.is_some()
113 || git_branch.is_some()
114 || agent_kind.is_some()
115 || agent_active.is_some()
116 || ports.is_some()
117 {
118 Some(SignalPaneMetadata {
119 title,
120 cwd,
121 repo_name,
122 git_branch,
123 ports: ports.unwrap_or_default(),
124 agent_kind,
125 agent_active,
126 })
127 } else {
128 None
129 };
130
131 Some(ParsedSignal {
132 kind: kind?,
133 message,
134 metadata,
135 })
136}
137
138fn parse_ports(value: &str) -> Option<Vec<u16>> {
139 if value.is_empty() {
140 return Some(Vec::new());
141 }
142
143 value
144 .split(',')
145 .map(|part| part.parse::<u16>().ok())
146 .collect::<Option<Vec<_>>>()
147}
148
149fn parse_bool(value: &str) -> Option<bool> {
150 match value.trim().to_ascii_lowercase().as_str() {
151 "1" | "true" | "yes" | "on" => Some(true),
152 "0" | "false" | "no" | "off" => Some(false),
153 _ => None,
154 }
155}
156
157fn decode_base64(value: &str) -> Option<String> {
158 let decoded = STANDARD.decode(value).ok()?;
159 String::from_utf8(decoded).ok()
160}
161
162fn percent_decode(value: &str) -> Option<String> {
163 let mut bytes = Vec::with_capacity(value.len());
164 let raw = value.as_bytes();
165 let mut index = 0usize;
166
167 while index < raw.len() {
168 match raw[index] {
169 b'%' if index + 2 < raw.len() => {
170 let high = decode_hex(raw[index + 1])?;
171 let low = decode_hex(raw[index + 2])?;
172 bytes.push((high << 4) | low);
173 index += 3;
174 }
175 byte => {
176 bytes.push(byte);
177 index += 1;
178 }
179 }
180 }
181
182 String::from_utf8(bytes).ok()
183}
184
185fn decode_hex(byte: u8) -> Option<u8> {
186 match byte {
187 b'0'..=b'9' => Some(byte - b'0'),
188 b'a'..=b'f' => Some(byte - b'a' + 10),
189 b'A'..=b'F' => Some(byte - b'A' + 10),
190 _ => None,
191 }
192}
193
194fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
195 if let Some(end) = remainder.find(BEL) {
196 return Some((&remainder[..end], end + BEL.len_utf8()));
197 }
198 if let Some(end) = remainder.find(ST) {
199 return Some((&remainder[..end], end + ST.len()));
200 }
201 None
202}
203
204#[cfg(test)]
205mod tests {
206 use base64::{Engine as _, engine::general_purpose::STANDARD};
207 use taskers_domain::SignalKind;
208
209 use super::{SignalStreamParser, parse_signal_frames};
210
211 #[test]
212 fn parses_multiple_frames_with_different_terminators() {
213 let output = concat!(
214 "hello",
215 "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
216 "world",
217 "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
218 );
219
220 let frames = parse_signal_frames(output);
221
222 assert_eq!(frames.len(), 2);
223 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
224 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
225 assert_eq!(frames[1].kind, SignalKind::Completed);
226 }
227
228 #[test]
229 fn ignores_unknown_frames() {
230 let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
231 assert!(parse_signal_frames(output).is_empty());
232 }
233
234 #[test]
235 fn stream_parser_handles_split_frames() {
236 let mut parser = SignalStreamParser::default();
237
238 assert!(
239 parser
240 .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
241 .is_empty()
242 );
243
244 let frames = parser.push("%20approval\u{7}");
245 assert_eq!(frames.len(), 1);
246 assert_eq!(frames[0].kind, SignalKind::WaitingInput);
247 assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
248 }
249
250 #[test]
251 fn parses_metadata_snapshots_with_base64_fields() {
252 let output = format!(
253 "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
254 STANDARD.encode("/home/notes/Projects/taskers"),
255 STANDARD.encode("taskers"),
256 STANDARD.encode("main"),
257 STANDARD.encode("codex"),
258 STANDARD.encode("codex · taskers"),
259 );
260
261 let frames = parse_signal_frames(&output);
262
263 assert_eq!(frames.len(), 1);
264 assert_eq!(frames[0].kind, SignalKind::Metadata);
265 let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
266 assert_eq!(
267 metadata.cwd.as_deref(),
268 Some("/home/notes/Projects/taskers")
269 );
270 assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
271 assert_eq!(metadata.git_branch.as_deref(), Some("main"));
272 assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
273 assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
274 assert_eq!(metadata.ports, vec![3000, 8080]);
275 }
276}