Skip to main content

rustyclaw_core/messengers/
streaming.rs

1//! Streaming support for messenger channels.
2//!
3//! Provides utilities for streaming model responses to messenger channels
4//! in real-time, rather than waiting for the full response. Different
5//! messengers support different streaming strategies:
6//!
7//! - **Edit-based**: Send an initial message, then edit it as tokens arrive
8//!   (Telegram, Discord, Slack).
9//! - **Chunked**: Send partial messages at intervals (IRC, generic).
10//! - **Draft**: Use platform-specific draft/typing APIs (where available).
11
12use serde::{Deserialize, Serialize};
13use std::time::{Duration, Instant};
14use tracing::debug;
15
16/// Streaming strategy for a messenger.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum StreamStrategy {
20    /// Send initial message, then edit in place as tokens arrive.
21    EditInPlace,
22    /// Accumulate tokens and send chunks at intervals.
23    Chunked,
24    /// Wait for full response before sending (no streaming).
25    BufferAll,
26}
27
28impl Default for StreamStrategy {
29    fn default() -> Self {
30        Self::EditInPlace
31    }
32}
33
34/// Configuration for messenger streaming.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct StreamConfig {
37    /// Whether streaming is enabled for this messenger.
38    #[serde(default)]
39    pub enabled: bool,
40
41    /// Streaming strategy.
42    #[serde(default)]
43    pub strategy: StreamStrategy,
44
45    /// Minimum interval between message edits (milliseconds).
46    /// Prevents rate-limiting from too-frequent edits.
47    #[serde(default = "default_edit_interval_ms")]
48    pub edit_interval_ms: u64,
49
50    /// For chunked strategy: minimum characters before sending a chunk.
51    #[serde(default = "default_chunk_min_chars")]
52    pub chunk_min_chars: usize,
53
54    /// Maximum message length before splitting into multiple messages.
55    #[serde(default = "default_max_message_len")]
56    pub max_message_len: usize,
57
58    /// Whether to show a typing indicator while generating.
59    #[serde(default = "default_true")]
60    pub show_typing: bool,
61
62    /// Suffix to append while streaming is in progress (e.g., " ▌").
63    #[serde(default = "default_cursor")]
64    pub streaming_cursor: String,
65}
66
67fn default_edit_interval_ms() -> u64 {
68    500
69}
70
71fn default_chunk_min_chars() -> usize {
72    100
73}
74
75fn default_max_message_len() -> usize {
76    4000
77}
78
79fn default_true() -> bool {
80    true
81}
82
83fn default_cursor() -> String {
84    " ▌".to_string()
85}
86
87impl Default for StreamConfig {
88    fn default() -> Self {
89        Self {
90            enabled: false,
91            strategy: StreamStrategy::default(),
92            edit_interval_ms: default_edit_interval_ms(),
93            chunk_min_chars: default_chunk_min_chars(),
94            max_message_len: default_max_message_len(),
95            show_typing: true,
96            streaming_cursor: default_cursor(),
97        }
98    }
99}
100
101/// Buffer that accumulates streaming tokens and decides when to flush.
102pub struct StreamBuffer {
103    /// Accumulated text.
104    content: String,
105    /// Last time a flush (edit/send) was performed.
106    last_flush: Instant,
107    /// Configuration.
108    config: StreamConfig,
109    /// Number of flushes performed.
110    flush_count: usize,
111    /// Whether the stream is complete.
112    done: bool,
113}
114
115impl StreamBuffer {
116    /// Create a new stream buffer.
117    pub fn new(config: StreamConfig) -> Self {
118        Self {
119            content: String::new(),
120            last_flush: Instant::now(),
121            config,
122            flush_count: 0,
123            done: false,
124        }
125    }
126
127    /// Add a text chunk to the buffer.
128    pub fn push(&mut self, text: &str) {
129        self.content.push_str(text);
130    }
131
132    /// Mark the stream as complete.
133    pub fn finish(&mut self) {
134        self.done = true;
135    }
136
137    /// Check if a flush is needed based on strategy and timing.
138    pub fn should_flush(&self) -> bool {
139        if self.done {
140            return true;
141        }
142
143        let elapsed = self.last_flush.elapsed();
144        let interval = Duration::from_millis(self.config.edit_interval_ms);
145
146        match self.config.strategy {
147            StreamStrategy::EditInPlace => elapsed >= interval && !self.content.is_empty(),
148            StreamStrategy::Chunked => {
149                elapsed >= interval && self.content.len() >= self.config.chunk_min_chars
150            }
151            StreamStrategy::BufferAll => self.done,
152        }
153    }
154
155    /// Get the current content to send/edit.
156    ///
157    /// For EditInPlace: returns full accumulated content with cursor.
158    /// For Chunked: returns the pending chunk and clears the buffer.
159    pub fn flush(&mut self) -> Option<FlushAction> {
160        if self.content.is_empty() && !self.done {
161            return None;
162        }
163
164        self.last_flush = Instant::now();
165        self.flush_count += 1;
166
167        let action = match self.config.strategy {
168            StreamStrategy::EditInPlace => {
169                let display_text = if self.done {
170                    self.content.clone()
171                } else {
172                    format!("{}{}", self.content, self.config.streaming_cursor)
173                };
174
175                if self.flush_count == 1 {
176                    FlushAction::SendNew(display_text)
177                } else {
178                    FlushAction::EditExisting(display_text)
179                }
180            }
181            StreamStrategy::Chunked => {
182                let chunk = std::mem::take(&mut self.content);
183                if chunk.is_empty() {
184                    return None;
185                }
186                FlushAction::SendNew(chunk)
187            }
188            StreamStrategy::BufferAll => {
189                if self.done {
190                    FlushAction::SendNew(std::mem::take(&mut self.content))
191                } else {
192                    return None;
193                }
194            }
195        };
196
197        debug!(
198            strategy = ?self.config.strategy,
199            flush_count = self.flush_count,
200            done = self.done,
201            "Stream buffer flushed"
202        );
203
204        Some(action)
205    }
206
207    /// Check if streaming is complete.
208    pub fn is_done(&self) -> bool {
209        self.done
210    }
211
212    /// Get current content length.
213    pub fn content_len(&self) -> usize {
214        self.content.len()
215    }
216
217    /// Check if content exceeds max message length and needs splitting.
218    pub fn needs_split(&self) -> bool {
219        self.content.len() > self.config.max_message_len
220    }
221
222    /// Split content into message-sized chunks.
223    pub fn split_content(&self) -> Vec<String> {
224        let max_len = self.config.max_message_len;
225        if self.content.len() <= max_len {
226            return vec![self.content.clone()];
227        }
228
229        let mut chunks = Vec::new();
230        let mut remaining = self.content.as_str();
231
232        while !remaining.is_empty() {
233            if remaining.len() <= max_len {
234                chunks.push(remaining.to_string());
235                break;
236            }
237
238            // Find a char boundary at or before max_len to avoid
239            // panicking when max_len falls inside a multi-byte codepoint.
240            let mut boundary = max_len;
241            while boundary > 0 && !remaining.is_char_boundary(boundary) {
242                boundary -= 1;
243            }
244
245            // Try to split at a newline or space within the boundary
246            let split_at = remaining[..boundary]
247                .rfind('\n')
248                .or_else(|| remaining[..boundary].rfind(' '))
249                .unwrap_or(boundary);
250
251            chunks.push(remaining[..split_at].to_string());
252            remaining = remaining[split_at..].trim_start();
253        }
254
255        chunks
256    }
257}
258
259/// Action to perform after flushing the stream buffer.
260#[derive(Debug, Clone)]
261pub enum FlushAction {
262    /// Send a new message.
263    SendNew(String),
264    /// Edit the previously sent message.
265    EditExisting(String),
266}
267
268/// Get the recommended stream strategy for a messenger type.
269pub fn recommended_strategy(messenger_type: &str) -> StreamStrategy {
270    match messenger_type {
271        "telegram" | "discord" | "slack" => StreamStrategy::EditInPlace,
272        "irc" | "webhook" => StreamStrategy::Chunked,
273        "teams" | "google_chat" => StreamStrategy::EditInPlace,
274        "imessage" => StreamStrategy::BufferAll,
275        _ => StreamStrategy::BufferAll,
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn test_stream_config_defaults() {
285        let config = StreamConfig::default();
286        assert!(!config.enabled);
287        assert_eq!(config.strategy, StreamStrategy::EditInPlace);
288        assert_eq!(config.edit_interval_ms, 500);
289        assert!(config.show_typing);
290    }
291
292    #[test]
293    fn test_stream_buffer_edit_in_place() {
294        let config = StreamConfig {
295            enabled: true,
296            strategy: StreamStrategy::EditInPlace,
297            edit_interval_ms: 0, // immediate
298            ..Default::default()
299        };
300
301        let mut buf = StreamBuffer::new(config);
302        buf.push("Hello ");
303        buf.push("world");
304
305        // First flush should be SendNew
306        let action = buf.flush().unwrap();
307        assert!(matches!(action, FlushAction::SendNew(_)));
308
309        buf.push("!");
310        // Subsequent flush should be EditExisting
311        let action = buf.flush().unwrap();
312        assert!(matches!(action, FlushAction::EditExisting(_)));
313    }
314
315    #[test]
316    fn test_stream_buffer_chunked() {
317        let config = StreamConfig {
318            enabled: true,
319            strategy: StreamStrategy::Chunked,
320            edit_interval_ms: 0,
321            chunk_min_chars: 5,
322            ..Default::default()
323        };
324
325        let mut buf = StreamBuffer::new(config);
326        buf.push("Hello");
327
328        let action = buf.flush().unwrap();
329        assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello"));
330
331        // Buffer should be cleared after chunked flush
332        assert_eq!(buf.content_len(), 0);
333    }
334
335    #[test]
336    fn test_stream_buffer_buffer_all() {
337        let config = StreamConfig {
338            enabled: true,
339            strategy: StreamStrategy::BufferAll,
340            ..Default::default()
341        };
342
343        let mut buf = StreamBuffer::new(config);
344        buf.push("Hello ");
345        buf.push("world");
346
347        // Should not flush until done
348        assert!(!buf.should_flush());
349
350        buf.finish();
351        assert!(buf.should_flush());
352
353        let action = buf.flush().unwrap();
354        assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello world"));
355    }
356
357    #[test]
358    fn test_stream_buffer_cursor() {
359        let config = StreamConfig {
360            enabled: true,
361            strategy: StreamStrategy::EditInPlace,
362            edit_interval_ms: 0,
363            streaming_cursor: " ▌".to_string(),
364            ..Default::default()
365        };
366
367        let mut buf = StreamBuffer::new(config);
368        buf.push("typing...");
369
370        let action = buf.flush().unwrap();
371        if let FlushAction::SendNew(text) = action {
372            assert!(text.ends_with(" ▌"));
373        }
374
375        buf.finish();
376        buf.push(""); // trigger final state
377        let action = buf.flush().unwrap();
378        if let FlushAction::EditExisting(text) = action {
379            assert!(!text.ends_with(" ▌"));
380        }
381    }
382
383    #[test]
384    fn test_split_content() {
385        let config = StreamConfig {
386            max_message_len: 10,
387            ..Default::default()
388        };
389
390        let mut buf = StreamBuffer::new(config);
391        buf.push("Hello world, this is a test");
392
393        let chunks = buf.split_content();
394        assert!(chunks.len() > 1);
395        for chunk in &chunks {
396            assert!(chunk.len() <= 10 || !chunk.contains(' '));
397        }
398    }
399
400    #[test]
401    fn test_recommended_strategy() {
402        assert_eq!(
403            recommended_strategy("telegram"),
404            StreamStrategy::EditInPlace
405        );
406        assert_eq!(recommended_strategy("irc"), StreamStrategy::Chunked);
407        assert_eq!(recommended_strategy("imessage"), StreamStrategy::BufferAll);
408        assert_eq!(
409            recommended_strategy("unknown"),
410            StreamStrategy::BufferAll
411        );
412    }
413}