Skip to main content

sparrow/streaming/
mod.rs

1//! Live streaming infrastructure for Sparrow.
2//!
3//! Provides channels for real-time event streaming and a LiveDisplay
4//! that renders progress bars, spinners, and status updates in the terminal.
5
6use std::time::Instant;
7
8pub mod lane;
9pub mod progress;
10
11/// Events emitted during task execution.
12#[derive(Debug, Clone)]
13pub enum StreamEvent {
14    /// A tool is starting
15    ToolStart { name: String, description: String },
16    /// Tool produced output
17    ToolOutput { name: String, output: String },
18    /// The agent sent a message
19    AgentMessage { content: String },
20    /// Task progress update
21    TaskProgress { current: u64, total: u64, description: String },
22    /// Task completed
23    TaskComplete { summary: String },
24    /// An error occurred
25    Error { message: String },
26}
27
28/// Sender side of the stream channel.
29pub struct StreamSender {
30    tx: tokio::sync::mpsc::UnboundedSender<StreamEvent>,
31}
32
33impl StreamSender {
34    pub fn send(&self, event: StreamEvent) {
35        let _ = self.tx.send(event);
36    }
37}
38
39/// Receiver side with live terminal display.
40pub struct StreamReceiver {
41    rx: tokio::sync::mpsc::UnboundedReceiver<StreamEvent>,
42    display: LiveDisplay,
43}
44
45impl StreamReceiver {
46    pub fn display(&mut self) -> &mut LiveDisplay {
47        &mut self.display
48    }
49
50    /// Process incoming events and update the display.
51    pub async fn process(&mut self) -> Option<StreamEvent> {
52        match self.rx.recv().await {
53            Some(event) => {
54                self.display.update(&event);
55                Some(event)
56            }
57            None => None,
58        }
59    }
60}
61
62/// Create a streaming channel pair.
63pub fn create_channel() -> (StreamSender, StreamReceiver) {
64    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
65    let sender = StreamSender { tx };
66    let receiver = StreamReceiver {
67        rx,
68        display: LiveDisplay::new(),
69    };
70    (sender, receiver)
71}
72
73/// Live terminal display for streaming task execution.
74pub struct LiveDisplay {
75    start_time: Instant,
76    current_tool: Option<String>,
77    task_description: String,
78    steps_done: u64,
79    steps_total: u64,
80}
81
82impl LiveDisplay {
83    pub fn new() -> Self {
84        Self {
85            start_time: Instant::now(),
86            current_tool: None,
87            task_description: String::new(),
88            steps_done: 0,
89            steps_total: 0,
90        }
91    }
92
93    /// Update the display with a new event.
94    pub fn update(&mut self, event: &StreamEvent) {
95        match event {
96            StreamEvent::ToolStart { name, description } => {
97                self.current_tool = Some(name.clone());
98                let elapsed = self.start_time.elapsed().as_secs();
99                eprintln!(
100                    "\x1b[36m[{elapsed:>3}s]\x1b[0m \x1b[33m🔧 {name}\x1b[0m — {description}"
101                );
102            }
103            StreamEvent::ToolOutput { name, output } => {
104                let preview: String = output.lines().take(3).map(|l| format!("  │ {l}")).collect::<Vec<_>>().join("\n");
105                if !preview.is_empty() {
106                    eprintln!("{preview}");
107                    if output.lines().count() > 3 {
108                        eprintln!("  │ \x1b[2m... ({} more lines)\x1b[0m", output.lines().count() - 3);
109                    }
110                }
111            }
112            StreamEvent::AgentMessage { content } => {
113                eprintln!("\x1b[35m💬\x1b[0m {content}");
114            }
115            StreamEvent::TaskProgress { current, total, description } => {
116                self.steps_done = *current;
117                self.steps_total = *total;
118                self.task_description = description.clone();
119                let pct = if *total > 0 { (current * 100) / total } else { 0 };
120                let bar = "â–ˆ".repeat(pct as usize / 5) + &"â–‘".repeat(20 - pct as usize / 5);
121                let elapsed = self.start_time.elapsed().as_secs();
122                eprintln!(
123                    "\x1b[36m[{elapsed:>3}s]\x1b[0m [{bar}] {pct}% — {description}"
124                );
125            }
126            StreamEvent::TaskComplete { summary } => {
127                let elapsed = self.start_time.elapsed().as_secs_f64();
128                eprintln!("\x1b[32m✓\x1b[0m {summary} \x1b[2m({elapsed:.1}s)\x1b[0m");
129            }
130            StreamEvent::Error { message } => {
131                eprintln!("\x1b[31m✗\x1b[0m {message}");
132            }
133        }
134    }
135
136    /// Finish the display and print a summary.
137    pub fn finish(&mut self) {
138        let elapsed = self.start_time.elapsed().as_secs_f64();
139        eprintln!("\n\x1b[1mDone in {elapsed:.1}s\x1b[0m — {}/{} steps completed", self.steps_done, self.steps_total);
140    }
141
142    /// Get the elapsed time in seconds.
143    pub fn elapsed(&self) -> f64 {
144        self.start_time.elapsed().as_secs_f64()
145    }
146}