Skip to main content

sparrow/streaming/
mod.rs

1//! Live streaming infrastructure for Sparrow.
2//!
3//! Provides channels for real-time event streaming and a LiveDisplay
4//! that renders progress bars, spinners, and status updates in the terminal.
5
6use std::time::Instant;
7
8pub mod lane;
9pub mod progress;
10
11/// Events emitted during task execution.
12#[derive(Debug, Clone)]
13pub enum StreamEvent {
14    /// A tool is starting
15    ToolStart { name: String, description: String },
16    /// Tool produced output
17    ToolOutput { name: String, output: String },
18    /// The agent sent a message
19    AgentMessage { content: String },
20    /// Task progress update
21    TaskProgress {
22        current: u64,
23        total: u64,
24        description: String,
25    },
26    /// Task completed
27    TaskComplete { summary: String },
28    /// An error occurred
29    Error { message: String },
30}
31
32/// Sender side of the stream channel.
33pub struct StreamSender {
34    tx: tokio::sync::mpsc::UnboundedSender<StreamEvent>,
35}
36
37impl StreamSender {
38    pub fn send(&self, event: StreamEvent) {
39        let _ = self.tx.send(event);
40    }
41}
42
43/// Receiver side with live terminal display.
44pub struct StreamReceiver {
45    rx: tokio::sync::mpsc::UnboundedReceiver<StreamEvent>,
46    display: LiveDisplay,
47}
48
49impl StreamReceiver {
50    pub fn display(&mut self) -> &mut LiveDisplay {
51        &mut self.display
52    }
53
54    /// Process incoming events and update the display.
55    pub async fn process(&mut self) -> Option<StreamEvent> {
56        match self.rx.recv().await {
57            Some(event) => {
58                self.display.update(&event);
59                Some(event)
60            }
61            None => None,
62        }
63    }
64}
65
66/// Create a streaming channel pair.
67pub fn create_channel() -> (StreamSender, StreamReceiver) {
68    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
69    let sender = StreamSender { tx };
70    let receiver = StreamReceiver {
71        rx,
72        display: LiveDisplay::new(),
73    };
74    (sender, receiver)
75}
76
77/// Live terminal display for streaming task execution.
78pub struct LiveDisplay {
79    start_time: Instant,
80    current_tool: Option<String>,
81    task_description: String,
82    steps_done: u64,
83    steps_total: u64,
84}
85
86impl LiveDisplay {
87    pub fn new() -> Self {
88        Self {
89            start_time: Instant::now(),
90            current_tool: None,
91            task_description: String::new(),
92            steps_done: 0,
93            steps_total: 0,
94        }
95    }
96
97    /// Update the display with a new event.
98    pub fn update(&mut self, event: &StreamEvent) {
99        match event {
100            StreamEvent::ToolStart { name, description } => {
101                self.current_tool = Some(name.clone());
102                let elapsed = self.start_time.elapsed().as_secs();
103                eprintln!(
104                    "\x1b[36m[{elapsed:>3}s]\x1b[0m \x1b[33m🔧 {name}\x1b[0m — {description}"
105                );
106            }
107            StreamEvent::ToolOutput { name: _, output } => {
108                let preview: String = output
109                    .lines()
110                    .take(3)
111                    .map(|l| format!("  │ {l}"))
112                    .collect::<Vec<_>>()
113                    .join("\n");
114                if !preview.is_empty() {
115                    eprintln!("{preview}");
116                    if output.lines().count() > 3 {
117                        eprintln!(
118                            "  │ \x1b[2m... ({} more lines)\x1b[0m",
119                            output.lines().count() - 3
120                        );
121                    }
122                }
123            }
124            StreamEvent::AgentMessage { content } => {
125                eprintln!("\x1b[35m💬\x1b[0m {content}");
126            }
127            StreamEvent::TaskProgress {
128                current,
129                total,
130                description,
131            } => {
132                self.steps_done = *current;
133                self.steps_total = *total;
134                self.task_description = description.clone();
135                let pct = if *total > 0 {
136                    (current * 100) / total
137                } else {
138                    0
139                };
140                let bar = "â–ˆ".repeat(pct as usize / 5) + &"â–‘".repeat(20 - pct as usize / 5);
141                let elapsed = self.start_time.elapsed().as_secs();
142                eprintln!("\x1b[36m[{elapsed:>3}s]\x1b[0m [{bar}] {pct}% — {description}");
143            }
144            StreamEvent::TaskComplete { summary } => {
145                let elapsed = self.start_time.elapsed().as_secs_f64();
146                eprintln!("\x1b[32m✓\x1b[0m {summary} \x1b[2m({elapsed:.1}s)\x1b[0m");
147            }
148            StreamEvent::Error { message } => {
149                eprintln!("\x1b[31m✗\x1b[0m {message}");
150            }
151        }
152    }
153
154    /// Finish the display and print a summary.
155    pub fn finish(&mut self) {
156        let elapsed = self.start_time.elapsed().as_secs_f64();
157        eprintln!(
158            "\n\x1b[1mDone in {elapsed:.1}s\x1b[0m — {}/{} steps completed",
159            self.steps_done, self.steps_total
160        );
161    }
162
163    /// Get the elapsed time in seconds.
164    pub fn elapsed(&self) -> f64 {
165        self.start_time.elapsed().as_secs_f64()
166    }
167}