use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::AbortHandle;
use crate::engine::{Engine, Task};
use crate::event::Event;
use crate::runtime::recorder::{FsRecorder, Recorder, RunInputs};
#[derive(Default, Clone)]
pub struct RunRegistry {
inner: Arc<Mutex<HashMap<String, AbortHandle>>>,
}
impl RunRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn insert(&self, run_id: String, handle: AbortHandle) {
if let Ok(mut g) = self.inner.lock() {
g.insert(run_id, handle);
}
}
pub fn remove(&self, run_id: &str) {
if let Ok(mut g) = self.inner.lock() {
g.remove(run_id);
}
}
pub fn abort(&self, run_id: &str) -> bool {
if let Ok(mut g) = self.inner.lock() {
if let Some(h) = g.remove(run_id) {
h.abort();
return true;
}
}
false
}
pub fn active_run_ids(&self) -> Vec<String> {
self.inner
.lock()
.map(|g| g.keys().cloned().collect())
.unwrap_or_default()
}
}
#[cfg(test)]
mod registry_tests {
use super::*;
#[tokio::test]
async fn abort_unknown_run_returns_false() {
let reg = RunRegistry::new();
assert!(!reg.abort("does-not-exist"));
}
#[tokio::test]
async fn abort_cancels_a_registered_task() {
let reg = RunRegistry::new();
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
});
reg.insert("r1".into(), handle.abort_handle());
assert!(reg.abort("r1"));
let res = handle.await;
assert!(res.is_err() && res.unwrap_err().is_cancelled());
}
}
pub mod discord;
pub mod email;
pub mod extra_transports;
pub mod slack;
pub mod telegram;
pub mod ws;
#[derive(Debug, Clone)]
pub struct GatewayMessage {
pub surface: String,
pub user_id: String,
pub chat_id: String,
pub text: String,
pub message_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct GatewayResponse {
pub surface: String,
pub chat_id: String,
pub text: String,
pub reply_to: Option<String>,
pub buttons: Vec<Vec<String>>,
}
#[async_trait::async_trait]
pub trait GatewayTransport: Send + Sync {
fn name(&self) -> &str;
async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()>;
async fn send(&self, response: GatewayResponse) -> anyhow::Result<()>;
async fn stop(&self) -> anyhow::Result<()>;
}
pub struct MessageRouter {
engine: Arc<Engine>,
recorder: Arc<FsRecorder>,
event_bus_tx: tokio::sync::broadcast::Sender<Event>,
allowed_users: Vec<String>,
sessions: Option<Arc<crate::runtime::session::SessionStore>>,
pub run_registry: RunRegistry,
}
impl MessageRouter {
pub fn new(
engine: Arc<Engine>,
recorder: Arc<FsRecorder>,
event_bus_tx: tokio::sync::broadcast::Sender<Event>,
allowed_users: Vec<String>,
) -> Self {
Self {
engine,
recorder,
event_bus_tx,
allowed_users,
sessions: None,
run_registry: RunRegistry::new(),
}
}
pub fn with_sessions(mut self, sessions: Arc<crate::runtime::session::SessionStore>) -> Self {
self.sessions = Some(sessions);
self
}
pub fn session_key(msg_user_id: &str, surface: &str, chat_id: &str) -> String {
let surface = session_component(surface, "surface");
let chat = session_component(chat_id, "channel");
let user = session_component(msg_user_id, "anonymous");
format!("gateway:{}:channel:{}:peer:{}", surface, chat, user)
}
pub async fn route(
&self,
msg: GatewayMessage,
responses: &mpsc::UnboundedSender<GatewayResponse>,
) {
if !self.allowed_users.is_empty() && !self.allowed_users.contains(&msg.user_id) {
let _ = responses.send(GatewayResponse {
surface: msg.surface.clone(),
chat_id: msg.chat_id.clone(),
text: "Unauthorized. Ask the admin to add your user ID.".into(),
reply_to: msg.message_id,
buttons: vec![],
});
return;
}
let text = msg.text.trim();
let surface = msg.surface.clone();
let chat_id = msg.chat_id.clone();
let user_id = msg.user_id.clone();
let reply_to = msg.message_id.clone();
if text.is_empty() {
return;
}
if text.starts_with('/') {
self.handle_command(text, surface, chat_id, user_id, reply_to, responses)
.await;
} else {
self.handle_task(text, surface, chat_id, user_id, reply_to, responses)
.await;
}
}
async fn handle_command(
&self,
text: &str,
surface: String,
chat_id: String,
user_id: String,
reply_to: Option<String>,
responses: &mpsc::UnboundedSender<GatewayResponse>,
) {
let parts: Vec<&str> = text.splitn(2, ' ').collect();
let cmd = parts[0].to_lowercase();
let args = parts.get(1).unwrap_or(&"");
match cmd.as_str() {
"/start" | "/help" => {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: format!(
"Sparrow — one cli · grows with you\n\n\
Commands:\n\
/run <task> — Execute a task\n\
/status — Show engine status\n\
/models — List configured models\n\
/budget — Show budget status\n\
/help — This message\n\n\
Or just send a message to start a task."
),
reply_to,
buttons: vec![vec!["/run ".into(), "/status".into()]],
});
}
"/run" => {
if args.is_empty() {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: "Usage: /run <task description>".into(),
reply_to,
buttons: vec![],
});
return;
}
self.handle_task(args, surface, chat_id, user_id, reply_to, responses)
.await;
}
"/reset" => {
if let Some(sessions) = &self.sessions {
let key = Self::session_key(&user_id, &surface, &chat_id);
let _ = sessions.delete(&key);
}
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: "Session cleared. Next message starts fresh.".into(),
reply_to,
buttons: vec![],
});
}
"/status" => {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: "Engine: online\nMode: headless".into(),
reply_to,
buttons: vec![],
});
}
"/models" => {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: "Use 'sparrow model --list' in CLI for model listing.".into(),
reply_to,
buttons: vec![],
});
}
"/budget" => {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: "Budget: configured in ~/.config/sparrow/config.toml".into(),
reply_to,
buttons: vec![],
});
}
_ => {
let _ = responses.send(GatewayResponse {
surface,
chat_id,
text: format!("Unknown command: {}. Try /help", cmd),
reply_to,
buttons: vec![],
});
}
}
}
async fn handle_task(
&self,
text: &str,
surface: String,
chat_id: String,
user_id: String,
reply_to: Option<String>,
responses: &mpsc::UnboundedSender<GatewayResponse>,
) {
let task_text = text.to_string();
let resp_tx = responses.clone();
let cid = chat_id.clone();
let surface_for_done = surface.clone();
let resp_tx2 = resp_tx.clone();
let cid2 = cid.clone();
let surface_for_stream = surface.clone();
let reply_to2 = reply_to.clone();
let session_key = Self::session_key(&user_id, &surface, &chat_id);
let prior_msgs: Vec<crate::provider::Msg> = self
.sessions
.as_ref()
.and_then(|s| s.load(&session_key))
.and_then(|sess| serde_json::from_str(&sess.messages_json).ok())
.unwrap_or_default();
let sessions_for_save = self.sessions.clone();
let session_key_save = session_key.clone();
let prior_for_save = prior_msgs.clone();
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Event>();
let event_bus = self.event_bus_tx.clone();
let engine = self.engine.clone();
let recorder = self.recorder.clone();
let _ = resp_tx.send(GatewayResponse {
surface: surface.clone(),
chat_id: cid.clone(),
text: format!("Working on: {}", &task_text[..task_text.len().min(80)]),
reply_to: reply_to.clone(),
buttons: vec![],
});
let run_id = uuid::Uuid::new_v4().to_string();
recorder.start_run(
run_id.clone(),
RunInputs {
task: task_text.clone(),
config_snapshot: serde_json::json!({}),
model_id: "gateway".into(),
repo_head: None,
timestamp: chrono::Utc::now().to_rfc3339(),
agent: "gateway".into(),
},
);
let registry = self.run_registry.clone();
let run_id_for_dereg = run_id.clone();
let drive_handle = tokio::spawn(async move {
let task = Task {
description: task_text.clone(),
context: prior_msgs,
};
match engine.drive(task, task_tx.clone()).await {
Ok(outcome) => {
let _ = event_bus.send(Event::RunFinished {
run: crate::event::RunId(run_id.clone()),
outcome: outcome.clone(),
});
let _ = recorder.finalize(&run_id);
let _ = resp_tx.send(GatewayResponse {
surface: surface_for_done,
chat_id: cid.clone(),
text: format!(
"Done.\nStatus: {}\nCost: ${:.4}\nFiles: {}",
outcome.status,
outcome.cost_usd,
outcome.diffs.len()
),
reply_to: reply_to.clone(),
buttons: vec![],
});
}
Err(e) => {
let _ = resp_tx.send(GatewayResponse {
surface: surface_for_done,
chat_id: cid,
text: format!("Error: {}", e),
reply_to: reply_to2,
buttons: vec![],
});
}
}
drop(task_tx);
});
self.run_registry
.insert(run_id_for_dereg.clone(), drive_handle.abort_handle());
{
let registry_for_dereg = registry.clone();
tokio::spawn(async move {
let _ = drive_handle.await;
registry_for_dereg.remove(&run_id_for_dereg);
});
}
let user_task_text = text.to_string();
tokio::spawn(async move {
let mut buffer = String::new();
let mut full_reply = String::new();
let mut reasoning_reply = String::new();
while let Some(event) = task_rx.recv().await {
if let Event::ThinkingDelta { text, .. } = &event {
full_reply.push_str(text);
}
if let Event::ReasoningDelta { text, .. } = &event {
reasoning_reply.push_str(text);
}
match &event {
Event::ThinkingDelta { text, .. } => {
buffer.push_str(text);
if buffer.len() > 500 || buffer.contains('\n') {
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text: buffer.clone(),
reply_to: None,
buttons: vec![],
});
buffer.clear();
}
}
Event::ToolUseProposed { name, .. } => {
if !buffer.is_empty() {
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text: buffer.clone(),
reply_to: None,
buttons: vec![],
});
buffer.clear();
}
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text: format!("[Tool: {}]", name),
reply_to: None,
buttons: vec![],
});
}
Event::ModelSwitched {
from, to, reason, ..
} => {
if !buffer.is_empty() {
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text: buffer.clone(),
reply_to: None,
buttons: vec![],
});
buffer.clear();
}
let clean = crate::event::friendly_model_switch_reason(reason);
let text = if crate::event::is_local_model_unavailable(reason) {
format!("modèle local indisponible → routage modèle cloud ({})", to)
} else {
format!("fallback: {} → {} ({})", from, to, clean)
};
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text,
reply_to: None,
buttons: vec![],
});
}
Event::ApprovalRequested { summary, .. } => {
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream.clone(),
chat_id: cid2.clone(),
text: format!("Approval needed: {}", summary),
reply_to: None,
buttons: vec![vec!["/approve".into(), "/deny".into()]],
});
}
_ => {}
}
}
if !buffer.is_empty() {
let _ = resp_tx2.send(GatewayResponse {
surface: surface_for_stream,
chat_id: cid2.clone(),
text: buffer,
reply_to: None,
buttons: vec![],
});
}
if let Some(sessions) = &sessions_for_save {
let mut updated = prior_for_save;
updated.push(crate::provider::Msg {
role: "user".into(),
content: vec![crate::provider::ContentBlock::Text {
text: user_task_text,
}],
});
if !full_reply.trim().is_empty() {
let mut content = Vec::new();
if !reasoning_reply.trim().is_empty() {
content.push(crate::provider::ContentBlock::Reasoning {
text: reasoning_reply,
});
}
content.push(crate::provider::ContentBlock::Text { text: full_reply });
updated.push(crate::provider::Msg {
role: "assistant".into(),
content,
});
}
let len = updated.len();
if len > 40 {
updated.drain(..len - 40);
}
let _ = sessions.save(&session_key_save, &updated, None);
}
});
}
}
pub fn format_event(event: &Event) -> Option<String> {
match event {
Event::RunStarted { task, agent, .. } => {
Some(format!("Started: {} (agent: {})", task, agent))
}
Event::RunFinished { outcome, .. } => Some(format!(
"Finished: {} | Cost: ${:.4} | Files: {}",
outcome.status,
outcome.cost_usd,
outcome.diffs.len()
)),
Event::ThinkingDelta { text, .. } => Some(text.clone()),
Event::ReasoningDelta { .. } => None,
Event::ModelSwitched {
from, to, reason, ..
} => {
let clean = crate::event::friendly_model_switch_reason(reason);
if crate::event::is_local_model_unavailable(reason) {
Some(format!(
"modèle local indisponible → routage modèle cloud ({})",
to
))
} else {
Some(format!("Fallback: {} → {} ({})", from, to, clean))
}
}
Event::ToolUseProposed { name, .. } => Some(format!("[{}]", name)),
Event::ApprovalRequested { summary, .. } => Some(format!("Approve: {}", summary)),
Event::Error { message, .. } => {
if crate::event::is_local_model_unavailable(message) {
None
} else {
Some(format!("Error: {}", message))
}
}
Event::CostUpdate { usd, .. } => Some(format!("Cost: ${:.4}", usd)),
Event::CheckpointCreated { label, .. } => Some(format!("Checkpoint: {}", label)),
_ => None,
}
}
fn session_component(value: &str, fallback: &str) -> String {
let cleaned = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
ch
} else {
'_'
}
})
.collect::<String>()
.trim_matches('_')
.to_string();
if cleaned.is_empty() {
fallback.to_string()
} else {
cleaned
}
}