Skip to main content

taskers_runtime/
signals.rs

1use base64::{Engine as _, engine::general_purpose::STANDARD};
2use taskers_domain::{SignalEvent, SignalKind, SignalPaneMetadata};
3
4const OSC_PREFIX: &str = "\u{1b}]777;taskers;";
5const BEL: char = '\u{7}';
6const ST: &str = "\u{1b}\\";
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct ParsedSignal {
10    pub kind: SignalKind,
11    pub message: Option<String>,
12    pub metadata: Option<SignalPaneMetadata>,
13}
14
15#[derive(Debug, Default, Clone)]
16pub struct SignalStreamParser {
17    pending: String,
18}
19
20impl ParsedSignal {
21    pub fn into_event(self, source: impl Into<String>) -> SignalEvent {
22        SignalEvent::with_metadata(source, self.kind, self.message, self.metadata)
23    }
24}
25
26pub fn parse_signal_frames(buffer: &str) -> Vec<ParsedSignal> {
27    let mut parser = SignalStreamParser::default();
28    parser.push(buffer)
29}
30
31impl SignalStreamParser {
32    pub fn push(&mut self, chunk: &str) -> Vec<ParsedSignal> {
33        self.pending.push_str(chunk);
34
35        let mut frames = Vec::new();
36        let mut cursor = 0usize;
37        let mut keep_from = self.pending.len().saturating_sub(OSC_PREFIX.len());
38
39        while let Some(found) = self.pending[cursor..].find(OSC_PREFIX) {
40            let frame_start = cursor + found;
41            let content_start = frame_start + OSC_PREFIX.len();
42            let remainder = &self.pending[content_start..];
43
44            let Some((raw_frame, consumed)) = frame_slice(remainder) else {
45                keep_from = frame_start;
46                break;
47            };
48
49            if let Some(parsed) = parse_frame(raw_frame) {
50                frames.push(parsed);
51            }
52
53            cursor = content_start + consumed;
54            keep_from = cursor;
55        }
56
57        self.pending = self.pending[keep_from..].to_string();
58        frames
59    }
60}
61
62fn parse_frame(frame: &str) -> Option<ParsedSignal> {
63    let mut kind = None;
64    let mut message = None;
65    let mut title = None;
66    let mut cwd = None;
67    let mut repo_name = None;
68    let mut git_branch = None;
69    let mut agent_kind = None;
70    let mut agent_active = None;
71    let mut ports = None;
72
73    for part in frame.split(';') {
74        let (key, value) = part.split_once('=')?;
75        match key {
76            "kind" => {
77                kind = Some(match value {
78                    "metadata" => SignalKind::Metadata,
79                    "started" => SignalKind::Started,
80                    "progress" => SignalKind::Progress,
81                    "completed" => SignalKind::Completed,
82                    "waiting_input" => SignalKind::WaitingInput,
83                    "error" => SignalKind::Error,
84                    "notification" => SignalKind::Notification,
85                    _ => return None,
86                });
87            }
88            "message" => message = percent_decode(value),
89            "message_b64" => message = decode_base64(value),
90            "title" => title = percent_decode(value),
91            "title_b64" => title = decode_base64(value),
92            "cwd" => cwd = percent_decode(value),
93            "cwd_b64" => cwd = decode_base64(value),
94            "repo" | "repo_name" => repo_name = percent_decode(value),
95            "repo_b64" | "repo_name_b64" => repo_name = decode_base64(value),
96            "branch" | "git_branch" => git_branch = percent_decode(value),
97            "branch_b64" | "git_branch_b64" => git_branch = decode_base64(value),
98            "agent" | "agent_kind" => agent_kind = percent_decode(value),
99            "agent_b64" | "agent_kind_b64" => agent_kind = decode_base64(value),
100            "agent_active" => agent_active = parse_bool(value),
101            "agent_active_b64" => {
102                agent_active = decode_base64(value).and_then(|decoded| parse_bool(&decoded))
103            }
104            "ports" => ports = parse_ports(value),
105            "ports_b64" => ports = decode_base64(value).and_then(|decoded| parse_ports(&decoded)),
106            _ => {}
107        }
108    }
109
110    let metadata = if title.is_some()
111        || cwd.is_some()
112        || repo_name.is_some()
113        || git_branch.is_some()
114        || agent_kind.is_some()
115        || agent_active.is_some()
116        || ports.is_some()
117    {
118        Some(SignalPaneMetadata {
119            title,
120            cwd,
121            repo_name,
122            git_branch,
123            ports: ports.unwrap_or_default(),
124            agent_kind,
125            agent_active,
126        })
127    } else {
128        None
129    };
130
131    Some(ParsedSignal {
132        kind: kind?,
133        message,
134        metadata,
135    })
136}
137
138fn parse_ports(value: &str) -> Option<Vec<u16>> {
139    if value.is_empty() {
140        return Some(Vec::new());
141    }
142
143    value
144        .split(',')
145        .map(|part| part.parse::<u16>().ok())
146        .collect::<Option<Vec<_>>>()
147}
148
149fn parse_bool(value: &str) -> Option<bool> {
150    match value.trim().to_ascii_lowercase().as_str() {
151        "1" | "true" | "yes" | "on" => Some(true),
152        "0" | "false" | "no" | "off" => Some(false),
153        _ => None,
154    }
155}
156
157fn decode_base64(value: &str) -> Option<String> {
158    let decoded = STANDARD.decode(value).ok()?;
159    String::from_utf8(decoded).ok()
160}
161
162fn percent_decode(value: &str) -> Option<String> {
163    let mut bytes = Vec::with_capacity(value.len());
164    let raw = value.as_bytes();
165    let mut index = 0usize;
166
167    while index < raw.len() {
168        match raw[index] {
169            b'%' if index + 2 < raw.len() => {
170                let high = decode_hex(raw[index + 1])?;
171                let low = decode_hex(raw[index + 2])?;
172                bytes.push((high << 4) | low);
173                index += 3;
174            }
175            byte => {
176                bytes.push(byte);
177                index += 1;
178            }
179        }
180    }
181
182    String::from_utf8(bytes).ok()
183}
184
185fn decode_hex(byte: u8) -> Option<u8> {
186    match byte {
187        b'0'..=b'9' => Some(byte - b'0'),
188        b'a'..=b'f' => Some(byte - b'a' + 10),
189        b'A'..=b'F' => Some(byte - b'A' + 10),
190        _ => None,
191    }
192}
193
194fn frame_slice(remainder: &str) -> Option<(&str, usize)> {
195    if let Some(end) = remainder.find(BEL) {
196        return Some((&remainder[..end], end + BEL.len_utf8()));
197    }
198    if let Some(end) = remainder.find(ST) {
199        return Some((&remainder[..end], end + ST.len()));
200    }
201    None
202}
203
204#[cfg(test)]
205mod tests {
206    use base64::{Engine as _, engine::general_purpose::STANDARD};
207    use taskers_domain::SignalKind;
208
209    use super::{SignalStreamParser, parse_signal_frames};
210
211    #[test]
212    fn parses_multiple_frames_with_different_terminators() {
213        let output = concat!(
214            "hello",
215            "\u{1b}]777;taskers;kind=waiting_input;message=Need%20approval\u{7}",
216            "world",
217            "\u{1b}]777;taskers;kind=completed;message=Done\u{1b}\\",
218        );
219
220        let frames = parse_signal_frames(output);
221
222        assert_eq!(frames.len(), 2);
223        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
224        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
225        assert_eq!(frames[1].kind, SignalKind::Completed);
226    }
227
228    #[test]
229    fn ignores_unknown_frames() {
230        let output = "\u{1b}]777;taskers;kind=unknown;message=Bad\u{7}";
231        assert!(parse_signal_frames(output).is_empty());
232    }
233
234    #[test]
235    fn stream_parser_handles_split_frames() {
236        let mut parser = SignalStreamParser::default();
237
238        assert!(
239            parser
240                .push("\u{1b}]777;taskers;kind=waiting_input;message=Need")
241                .is_empty()
242        );
243
244        let frames = parser.push("%20approval\u{7}");
245        assert_eq!(frames.len(), 1);
246        assert_eq!(frames[0].kind, SignalKind::WaitingInput);
247        assert_eq!(frames[0].message.as_deref(), Some("Need approval"));
248    }
249
250    #[test]
251    fn parses_metadata_snapshots_with_base64_fields() {
252        let output = format!(
253            "\u{1b}]777;taskers;kind=metadata;cwd_b64={};repo_b64={};branch_b64={};agent_b64={};title_b64={};ports=3000,8080\u{7}",
254            STANDARD.encode("/home/notes/Projects/taskers"),
255            STANDARD.encode("taskers"),
256            STANDARD.encode("main"),
257            STANDARD.encode("codex"),
258            STANDARD.encode("codex · taskers"),
259        );
260
261        let frames = parse_signal_frames(&output);
262
263        assert_eq!(frames.len(), 1);
264        assert_eq!(frames[0].kind, SignalKind::Metadata);
265        let metadata = frames[0].metadata.as_ref().expect("metadata snapshot");
266        assert_eq!(
267            metadata.cwd.as_deref(),
268            Some("/home/notes/Projects/taskers")
269        );
270        assert_eq!(metadata.repo_name.as_deref(), Some("taskers"));
271        assert_eq!(metadata.git_branch.as_deref(), Some("main"));
272        assert_eq!(metadata.agent_kind.as_deref(), Some("codex"));
273        assert_eq!(metadata.title.as_deref(), Some("codex · taskers"));
274        assert_eq!(metadata.ports, vec![3000, 8080]);
275    }
276}