1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use tokio::sync::mpsc;
4use tokio::task::AbortHandle;
5
6use crate::engine::{Engine, Task};
7use crate::event::Event;
8use crate::runtime::recorder::{FsRecorder, Recorder, RunInputs};
9
10#[derive(Default, Clone)]
14pub struct RunRegistry {
15 inner: Arc<Mutex<HashMap<String, AbortHandle>>>,
16}
17
18impl RunRegistry {
19 pub fn new() -> Self {
20 Self::default()
21 }
22
23 pub fn insert(&self, run_id: String, handle: AbortHandle) {
24 if let Ok(mut g) = self.inner.lock() {
25 g.insert(run_id, handle);
26 }
27 }
28
29 pub fn remove(&self, run_id: &str) {
30 if let Ok(mut g) = self.inner.lock() {
31 g.remove(run_id);
32 }
33 }
34
35 pub fn abort(&self, run_id: &str) -> bool {
37 if let Ok(mut g) = self.inner.lock() {
38 if let Some(h) = g.remove(run_id) {
39 h.abort();
40 return true;
41 }
42 }
43 false
44 }
45
46 pub fn active_run_ids(&self) -> Vec<String> {
47 self.inner
48 .lock()
49 .map(|g| g.keys().cloned().collect())
50 .unwrap_or_default()
51 }
52}
53
54#[cfg(test)]
55mod registry_tests {
56 use super::*;
57
58 #[tokio::test]
59 async fn abort_unknown_run_returns_false() {
60 let reg = RunRegistry::new();
61 assert!(!reg.abort("does-not-exist"));
62 }
63
64 #[tokio::test]
65 async fn abort_cancels_a_registered_task() {
66 let reg = RunRegistry::new();
67 let handle = tokio::spawn(async {
68 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
69 });
70 reg.insert("r1".into(), handle.abort_handle());
71 assert!(reg.abort("r1"));
72 let res = handle.await;
74 assert!(res.is_err() && res.unwrap_err().is_cancelled());
75 }
76}
77
78pub mod discord;
79pub mod email;
80pub mod extra_transports;
81pub mod slack;
82pub mod telegram;
83pub mod ws;
84
85#[derive(Debug, Clone)]
88pub struct GatewayMessage {
89 pub surface: String,
90 pub user_id: String,
91 pub chat_id: String,
92 pub text: String,
93 pub message_id: Option<String>,
94}
95
96#[derive(Debug, Clone)]
97pub struct GatewayResponse {
98 pub surface: String,
99 pub chat_id: String,
100 pub text: String,
101 pub reply_to: Option<String>,
102 pub buttons: Vec<Vec<String>>,
103}
104
105#[async_trait::async_trait]
108pub trait GatewayTransport: Send + Sync {
109 fn name(&self) -> &str;
110 async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()>;
111 async fn send(&self, response: GatewayResponse) -> anyhow::Result<()>;
112 async fn stop(&self) -> anyhow::Result<()>;
113}
114
115pub struct MessageRouter {
118 engine: Arc<Engine>,
119 recorder: Arc<FsRecorder>,
120 event_bus_tx: tokio::sync::broadcast::Sender<Event>,
121 allowed_users: Vec<String>,
122 sessions: Option<Arc<crate::runtime::session::SessionStore>>,
125 pub run_registry: RunRegistry,
127}
128
129impl MessageRouter {
130 pub fn new(
131 engine: Arc<Engine>,
132 recorder: Arc<FsRecorder>,
133 event_bus_tx: tokio::sync::broadcast::Sender<Event>,
134 allowed_users: Vec<String>,
135 ) -> Self {
136 Self {
137 engine,
138 recorder,
139 event_bus_tx,
140 allowed_users,
141 sessions: None,
142 run_registry: RunRegistry::new(),
143 }
144 }
145
146 pub fn with_sessions(mut self, sessions: Arc<crate::runtime::session::SessionStore>) -> Self {
148 self.sessions = Some(sessions);
149 self
150 }
151
152 pub fn session_key(msg_user_id: &str, surface: &str, chat_id: &str) -> String {
156 let surface = session_component(surface, "surface");
157 let chat = session_component(chat_id, "channel");
158 let user = session_component(msg_user_id, "anonymous");
159 format!("gateway:{}:channel:{}:peer:{}", surface, chat, user)
160 }
161
162 pub async fn route(
164 &self,
165 msg: GatewayMessage,
166 responses: &mpsc::UnboundedSender<GatewayResponse>,
167 ) {
168 if !self.allowed_users.is_empty() && !self.allowed_users.contains(&msg.user_id) {
170 let _ = responses.send(GatewayResponse {
171 surface: msg.surface.clone(),
172 chat_id: msg.chat_id.clone(),
173 text: "Unauthorized. Ask the admin to add your user ID.".into(),
174 reply_to: msg.message_id,
175 buttons: vec![],
176 });
177 return;
178 }
179
180 let text = msg.text.trim();
181 let surface = msg.surface.clone();
182 let chat_id = msg.chat_id.clone();
183 let user_id = msg.user_id.clone();
184 let reply_to = msg.message_id.clone();
185
186 if text.is_empty() {
187 return;
188 }
189
190 if text.starts_with('/') {
192 self.handle_command(text, surface, chat_id, user_id, reply_to, responses)
193 .await;
194 } else {
195 self.handle_task(text, surface, chat_id, user_id, reply_to, responses)
196 .await;
197 }
198 }
199
200 async fn handle_command(
201 &self,
202 text: &str,
203 surface: String,
204 chat_id: String,
205 user_id: String,
206 reply_to: Option<String>,
207 responses: &mpsc::UnboundedSender<GatewayResponse>,
208 ) {
209 let parts: Vec<&str> = text.splitn(2, ' ').collect();
210 let cmd = parts[0].to_lowercase();
211 let args = parts.get(1).unwrap_or(&"");
212
213 match cmd.as_str() {
214 "/start" | "/help" => {
215 let _ = responses.send(GatewayResponse {
216 surface,
217 chat_id,
218 text: format!(
219 "Sparrow — one cli · grows with you\n\n\
220 Commands:\n\
221 /run <task> — Execute a task\n\
222 /status — Show engine status\n\
223 /models — List configured models\n\
224 /budget — Show budget status\n\
225 /help — This message\n\n\
226 Or just send a message to start a task."
227 ),
228 reply_to,
229 buttons: vec![vec!["/run ".into(), "/status".into()]],
230 });
231 }
232 "/run" => {
233 if args.is_empty() {
234 let _ = responses.send(GatewayResponse {
235 surface,
236 chat_id,
237 text: "Usage: /run <task description>".into(),
238 reply_to,
239 buttons: vec![],
240 });
241 return;
242 }
243 self.handle_task(args, surface, chat_id, user_id, reply_to, responses)
244 .await;
245 }
246 "/reset" => {
247 if let Some(sessions) = &self.sessions {
249 let key = Self::session_key(&user_id, &surface, &chat_id);
250 let _ = sessions.delete(&key);
251 }
252 let _ = responses.send(GatewayResponse {
253 surface,
254 chat_id,
255 text: "Session cleared. Next message starts fresh.".into(),
256 reply_to,
257 buttons: vec![],
258 });
259 }
260 "/status" => {
261 let _ = responses.send(GatewayResponse {
262 surface,
263 chat_id,
264 text: "Engine: online\nMode: headless".into(),
265 reply_to,
266 buttons: vec![],
267 });
268 }
269 "/models" => {
270 let _ = responses.send(GatewayResponse {
271 surface,
272 chat_id,
273 text: "Use 'sparrow model --list' in CLI for model listing.".into(),
274 reply_to,
275 buttons: vec![],
276 });
277 }
278 "/budget" => {
279 let _ = responses.send(GatewayResponse {
280 surface,
281 chat_id,
282 text: "Budget: configured in ~/.config/sparrow/config.toml".into(),
283 reply_to,
284 buttons: vec![],
285 });
286 }
287 _ => {
288 let _ = responses.send(GatewayResponse {
289 surface,
290 chat_id,
291 text: format!("Unknown command: {}. Try /help", cmd),
292 reply_to,
293 buttons: vec![],
294 });
295 }
296 }
297 }
298
299 async fn handle_task(
300 &self,
301 text: &str,
302 surface: String,
303 chat_id: String,
304 user_id: String,
305 reply_to: Option<String>,
306 responses: &mpsc::UnboundedSender<GatewayResponse>,
307 ) {
308 let task_text = text.to_string();
309 let resp_tx = responses.clone();
310 let cid = chat_id.clone();
311 let surface_for_done = surface.clone();
312
313 let resp_tx2 = resp_tx.clone();
315 let cid2 = cid.clone();
316 let surface_for_stream = surface.clone();
317 let reply_to2 = reply_to.clone();
318
319 let session_key = Self::session_key(&user_id, &surface, &chat_id);
323 let prior_msgs: Vec<crate::provider::Msg> = self
324 .sessions
325 .as_ref()
326 .and_then(|s| s.load(&session_key))
327 .and_then(|sess| serde_json::from_str(&sess.messages_json).ok())
328 .unwrap_or_default();
329 let sessions_for_save = self.sessions.clone();
330 let session_key_save = session_key.clone();
331 let prior_for_save = prior_msgs.clone();
332
333 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Event>();
335 let event_bus = self.event_bus_tx.clone();
336 let engine = self.engine.clone();
337 let recorder = self.recorder.clone();
338
339 let _ = resp_tx.send(GatewayResponse {
341 surface: surface.clone(),
342 chat_id: cid.clone(),
343 text: format!("Working on: {}", &task_text[..task_text.len().min(80)]),
344 reply_to: reply_to.clone(),
345 buttons: vec![],
346 });
347
348 let run_id = uuid::Uuid::new_v4().to_string();
350 recorder.start_run(
351 run_id.clone(),
352 RunInputs {
353 task: task_text.clone(),
354 config_snapshot: serde_json::json!({}),
355 model_id: "gateway".into(),
356 repo_head: None,
357 timestamp: chrono::Utc::now().to_rfc3339(),
358 agent: "gateway".into(),
359 },
360 );
361
362 let registry = self.run_registry.clone();
363 let run_id_for_dereg = run_id.clone();
364 let drive_handle = tokio::spawn(async move {
365 let task = Task {
366 description: task_text.clone(),
367 context: prior_msgs,
368 };
369
370 match engine.drive(task, task_tx.clone()).await {
371 Ok(outcome) => {
372 let _ = event_bus.send(Event::RunFinished {
373 run: crate::event::RunId(run_id.clone()),
374 outcome: outcome.clone(),
375 });
376 let _ = recorder.finalize(&run_id);
377 let _ = resp_tx.send(GatewayResponse {
378 surface: surface_for_done,
379 chat_id: cid.clone(),
380 text: format!(
381 "Done.\nStatus: {}\nCost: ${:.4}\nFiles: {}{}",
382 outcome.status,
383 outcome.cost_usd,
384 outcome.diffs.len(),
385 crate::cost::format_comparison_oneliner(
386 outcome.cost_usd,
387 &outcome.tokens
388 )
389 ),
390 reply_to: reply_to.clone(),
391 buttons: vec![],
392 });
393 }
394 Err(e) => {
395 let _ = resp_tx.send(GatewayResponse {
396 surface: surface_for_done,
397 chat_id: cid,
398 text: format!("Error: {}", e),
399 reply_to: reply_to2,
400 buttons: vec![],
401 });
402 }
403 }
404
405 drop(task_tx);
406 });
407 self.run_registry
408 .insert(run_id_for_dereg.clone(), drive_handle.abort_handle());
409 {
411 let registry_for_dereg = registry.clone();
412 tokio::spawn(async move {
413 let _ = drive_handle.await;
414 registry_for_dereg.remove(&run_id_for_dereg);
415 });
416 }
417
418 let user_task_text = text.to_string();
420 tokio::spawn(async move {
421 let mut buffer = String::new();
422 let mut full_reply = String::new();
423 let mut reasoning_reply = String::new();
424 while let Some(event) = task_rx.recv().await {
425 if let Event::ThinkingDelta { text, .. } = &event {
426 full_reply.push_str(text);
427 }
428 if let Event::ReasoningDelta { text, .. } = &event {
429 reasoning_reply.push_str(text);
430 }
431 match &event {
432 Event::ThinkingDelta { text, .. } => {
433 buffer.push_str(text);
434 if buffer.len() > 500 || buffer.contains('\n') {
435 let _ = resp_tx2.send(GatewayResponse {
436 surface: surface_for_stream.clone(),
437 chat_id: cid2.clone(),
438 text: buffer.clone(),
439 reply_to: None,
440 buttons: vec![],
441 });
442 buffer.clear();
443 }
444 }
445 Event::ToolUseProposed { name, .. } => {
446 if !buffer.is_empty() {
447 let _ = resp_tx2.send(GatewayResponse {
448 surface: surface_for_stream.clone(),
449 chat_id: cid2.clone(),
450 text: buffer.clone(),
451 reply_to: None,
452 buttons: vec![],
453 });
454 buffer.clear();
455 }
456 let _ = resp_tx2.send(GatewayResponse {
457 surface: surface_for_stream.clone(),
458 chat_id: cid2.clone(),
459 text: format!("[Tool: {}]", name),
460 reply_to: None,
461 buttons: vec![],
462 });
463 }
464 Event::ModelSwitched {
465 from, to, reason, ..
466 } => {
467 if !buffer.is_empty() {
468 let _ = resp_tx2.send(GatewayResponse {
469 surface: surface_for_stream.clone(),
470 chat_id: cid2.clone(),
471 text: buffer.clone(),
472 reply_to: None,
473 buttons: vec![],
474 });
475 buffer.clear();
476 }
477 let clean = crate::event::friendly_model_switch_reason(reason);
478 let text = if crate::event::is_local_model_unavailable(reason) {
479 format!("modèle local indisponible → routage modèle cloud ({})", to)
480 } else {
481 format!("fallback: {} → {} ({})", from, to, clean)
482 };
483 let _ = resp_tx2.send(GatewayResponse {
484 surface: surface_for_stream.clone(),
485 chat_id: cid2.clone(),
486 text,
487 reply_to: None,
488 buttons: vec![],
489 });
490 }
491 Event::ApprovalRequested { summary, .. } => {
492 let _ = resp_tx2.send(GatewayResponse {
493 surface: surface_for_stream.clone(),
494 chat_id: cid2.clone(),
495 text: format!("Approval needed: {}", summary),
496 reply_to: None,
497 buttons: vec![vec!["/approve".into(), "/deny".into()]],
498 });
499 }
500 _ => {}
501 }
502 }
503 if !buffer.is_empty() {
504 let _ = resp_tx2.send(GatewayResponse {
505 surface: surface_for_stream,
506 chat_id: cid2.clone(),
507 text: buffer,
508 reply_to: None,
509 buttons: vec![],
510 });
511 }
512
513 if let Some(sessions) = &sessions_for_save {
517 let mut updated = prior_for_save;
518 updated.push(crate::provider::Msg {
519 role: "user".into(),
520 content: vec![crate::provider::ContentBlock::Text {
521 text: user_task_text,
522 }],
523 });
524 if !full_reply.trim().is_empty() {
525 let mut content = Vec::new();
526 if !reasoning_reply.trim().is_empty() {
527 content.push(crate::provider::ContentBlock::Reasoning {
528 text: reasoning_reply,
529 });
530 }
531 content.push(crate::provider::ContentBlock::Text { text: full_reply });
532 updated.push(crate::provider::Msg {
533 role: "assistant".into(),
534 content,
535 });
536 }
537 let len = updated.len();
539 if len > 40 {
540 updated.drain(..len - 40);
541 }
542 let _ = sessions.save(&session_key_save, &updated, None);
543 }
544 });
545 }
546}
547
548pub fn format_event(event: &Event) -> Option<String> {
551 match event {
552 Event::RunStarted { task, agent, .. } => {
553 Some(format!("Started: {} (agent: {})", task, agent))
554 }
555 Event::RunFinished { outcome, .. } => Some(format!(
556 "Finished: {} | Cost: ${:.4} | Files: {}",
557 outcome.status,
558 outcome.cost_usd,
559 outcome.diffs.len()
560 )),
561 Event::ThinkingDelta { text, .. } => Some(text.clone()),
562 Event::ReasoningDelta { .. } => None,
563 Event::ModelSwitched {
564 from, to, reason, ..
565 } => {
566 let clean = crate::event::friendly_model_switch_reason(reason);
567 if crate::event::is_local_model_unavailable(reason) {
568 Some(format!(
569 "modèle local indisponible → routage modèle cloud ({})",
570 to
571 ))
572 } else {
573 Some(format!("Fallback: {} → {} ({})", from, to, clean))
574 }
575 }
576 Event::ToolUseProposed { name, .. } => Some(format!("[{}]", name)),
577 Event::ApprovalRequested { summary, .. } => Some(format!("Approve: {}", summary)),
578 Event::Error { message, .. } => {
579 if crate::event::is_local_model_unavailable(message) {
580 None
581 } else {
582 Some(format!("Error: {}", message))
583 }
584 }
585 Event::CostUpdate { usd, .. } => Some(format!("Cost: ${:.4}", usd)),
586 Event::CheckpointCreated { label, .. } => Some(format!("Checkpoint: {}", label)),
587 _ => None,
588 }
589}
590
591fn session_component(value: &str, fallback: &str) -> String {
592 let cleaned = value
593 .chars()
594 .map(|ch| {
595 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
596 ch
597 } else {
598 '_'
599 }
600 })
601 .collect::<String>()
602 .trim_matches('_')
603 .to_string();
604 if cleaned.is_empty() {
605 fallback.to_string()
606 } else {
607 cleaned
608 }
609}