rustyclaw_core/messengers/
streaming.rs1use serde::{Deserialize, Serialize};
13use std::time::{Duration, Instant};
14use tracing::debug;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum StreamStrategy {
20 EditInPlace,
22 Chunked,
24 BufferAll,
26}
27
28impl Default for StreamStrategy {
29 fn default() -> Self {
30 Self::EditInPlace
31 }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct StreamConfig {
37 #[serde(default)]
39 pub enabled: bool,
40
41 #[serde(default)]
43 pub strategy: StreamStrategy,
44
45 #[serde(default = "default_edit_interval_ms")]
48 pub edit_interval_ms: u64,
49
50 #[serde(default = "default_chunk_min_chars")]
52 pub chunk_min_chars: usize,
53
54 #[serde(default = "default_max_message_len")]
56 pub max_message_len: usize,
57
58 #[serde(default = "default_true")]
60 pub show_typing: bool,
61
62 #[serde(default = "default_cursor")]
64 pub streaming_cursor: String,
65}
66
67fn default_edit_interval_ms() -> u64 {
68 500
69}
70
71fn default_chunk_min_chars() -> usize {
72 100
73}
74
75fn default_max_message_len() -> usize {
76 4000
77}
78
79fn default_true() -> bool {
80 true
81}
82
83fn default_cursor() -> String {
84 " ▌".to_string()
85}
86
87impl Default for StreamConfig {
88 fn default() -> Self {
89 Self {
90 enabled: false,
91 strategy: StreamStrategy::default(),
92 edit_interval_ms: default_edit_interval_ms(),
93 chunk_min_chars: default_chunk_min_chars(),
94 max_message_len: default_max_message_len(),
95 show_typing: true,
96 streaming_cursor: default_cursor(),
97 }
98 }
99}
100
101pub struct StreamBuffer {
103 content: String,
105 last_flush: Instant,
107 config: StreamConfig,
109 flush_count: usize,
111 done: bool,
113}
114
115impl StreamBuffer {
116 pub fn new(config: StreamConfig) -> Self {
118 Self {
119 content: String::new(),
120 last_flush: Instant::now(),
121 config,
122 flush_count: 0,
123 done: false,
124 }
125 }
126
127 pub fn push(&mut self, text: &str) {
129 self.content.push_str(text);
130 }
131
132 pub fn finish(&mut self) {
134 self.done = true;
135 }
136
137 pub fn should_flush(&self) -> bool {
139 if self.done {
140 return true;
141 }
142
143 let elapsed = self.last_flush.elapsed();
144 let interval = Duration::from_millis(self.config.edit_interval_ms);
145
146 match self.config.strategy {
147 StreamStrategy::EditInPlace => elapsed >= interval && !self.content.is_empty(),
148 StreamStrategy::Chunked => {
149 elapsed >= interval && self.content.len() >= self.config.chunk_min_chars
150 }
151 StreamStrategy::BufferAll => self.done,
152 }
153 }
154
155 pub fn flush(&mut self) -> Option<FlushAction> {
160 if self.content.is_empty() && !self.done {
161 return None;
162 }
163
164 self.last_flush = Instant::now();
165 self.flush_count += 1;
166
167 let action = match self.config.strategy {
168 StreamStrategy::EditInPlace => {
169 let display_text = if self.done {
170 self.content.clone()
171 } else {
172 format!("{}{}", self.content, self.config.streaming_cursor)
173 };
174
175 if self.flush_count == 1 {
176 FlushAction::SendNew(display_text)
177 } else {
178 FlushAction::EditExisting(display_text)
179 }
180 }
181 StreamStrategy::Chunked => {
182 let chunk = std::mem::take(&mut self.content);
183 if chunk.is_empty() {
184 return None;
185 }
186 FlushAction::SendNew(chunk)
187 }
188 StreamStrategy::BufferAll => {
189 if self.done {
190 FlushAction::SendNew(std::mem::take(&mut self.content))
191 } else {
192 return None;
193 }
194 }
195 };
196
197 debug!(
198 strategy = ?self.config.strategy,
199 flush_count = self.flush_count,
200 done = self.done,
201 "Stream buffer flushed"
202 );
203
204 Some(action)
205 }
206
207 pub fn is_done(&self) -> bool {
209 self.done
210 }
211
212 pub fn content_len(&self) -> usize {
214 self.content.len()
215 }
216
217 pub fn needs_split(&self) -> bool {
219 self.content.len() > self.config.max_message_len
220 }
221
222 pub fn split_content(&self) -> Vec<String> {
224 let max_len = self.config.max_message_len;
225 if self.content.len() <= max_len {
226 return vec![self.content.clone()];
227 }
228
229 let mut chunks = Vec::new();
230 let mut remaining = self.content.as_str();
231
232 while !remaining.is_empty() {
233 if remaining.len() <= max_len {
234 chunks.push(remaining.to_string());
235 break;
236 }
237
238 let mut boundary = max_len;
241 while boundary > 0 && !remaining.is_char_boundary(boundary) {
242 boundary -= 1;
243 }
244
245 let split_at = remaining[..boundary]
247 .rfind('\n')
248 .or_else(|| remaining[..boundary].rfind(' '))
249 .unwrap_or(boundary);
250
251 chunks.push(remaining[..split_at].to_string());
252 remaining = remaining[split_at..].trim_start();
253 }
254
255 chunks
256 }
257}
258
259#[derive(Debug, Clone)]
261pub enum FlushAction {
262 SendNew(String),
264 EditExisting(String),
266}
267
268pub fn recommended_strategy(messenger_type: &str) -> StreamStrategy {
270 match messenger_type {
271 "telegram" | "discord" | "slack" => StreamStrategy::EditInPlace,
272 "irc" | "webhook" => StreamStrategy::Chunked,
273 "teams" | "google_chat" => StreamStrategy::EditInPlace,
274 "imessage" => StreamStrategy::BufferAll,
275 _ => StreamStrategy::BufferAll,
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn test_stream_config_defaults() {
285 let config = StreamConfig::default();
286 assert!(!config.enabled);
287 assert_eq!(config.strategy, StreamStrategy::EditInPlace);
288 assert_eq!(config.edit_interval_ms, 500);
289 assert!(config.show_typing);
290 }
291
292 #[test]
293 fn test_stream_buffer_edit_in_place() {
294 let config = StreamConfig {
295 enabled: true,
296 strategy: StreamStrategy::EditInPlace,
297 edit_interval_ms: 0, ..Default::default()
299 };
300
301 let mut buf = StreamBuffer::new(config);
302 buf.push("Hello ");
303 buf.push("world");
304
305 let action = buf.flush().unwrap();
307 assert!(matches!(action, FlushAction::SendNew(_)));
308
309 buf.push("!");
310 let action = buf.flush().unwrap();
312 assert!(matches!(action, FlushAction::EditExisting(_)));
313 }
314
315 #[test]
316 fn test_stream_buffer_chunked() {
317 let config = StreamConfig {
318 enabled: true,
319 strategy: StreamStrategy::Chunked,
320 edit_interval_ms: 0,
321 chunk_min_chars: 5,
322 ..Default::default()
323 };
324
325 let mut buf = StreamBuffer::new(config);
326 buf.push("Hello");
327
328 let action = buf.flush().unwrap();
329 assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello"));
330
331 assert_eq!(buf.content_len(), 0);
333 }
334
335 #[test]
336 fn test_stream_buffer_buffer_all() {
337 let config = StreamConfig {
338 enabled: true,
339 strategy: StreamStrategy::BufferAll,
340 ..Default::default()
341 };
342
343 let mut buf = StreamBuffer::new(config);
344 buf.push("Hello ");
345 buf.push("world");
346
347 assert!(!buf.should_flush());
349
350 buf.finish();
351 assert!(buf.should_flush());
352
353 let action = buf.flush().unwrap();
354 assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello world"));
355 }
356
357 #[test]
358 fn test_stream_buffer_cursor() {
359 let config = StreamConfig {
360 enabled: true,
361 strategy: StreamStrategy::EditInPlace,
362 edit_interval_ms: 0,
363 streaming_cursor: " ▌".to_string(),
364 ..Default::default()
365 };
366
367 let mut buf = StreamBuffer::new(config);
368 buf.push("typing...");
369
370 let action = buf.flush().unwrap();
371 if let FlushAction::SendNew(text) = action {
372 assert!(text.ends_with(" ▌"));
373 }
374
375 buf.finish();
376 buf.push(""); let action = buf.flush().unwrap();
378 if let FlushAction::EditExisting(text) = action {
379 assert!(!text.ends_with(" ▌"));
380 }
381 }
382
383 #[test]
384 fn test_split_content() {
385 let config = StreamConfig {
386 max_message_len: 10,
387 ..Default::default()
388 };
389
390 let mut buf = StreamBuffer::new(config);
391 buf.push("Hello world, this is a test");
392
393 let chunks = buf.split_content();
394 assert!(chunks.len() > 1);
395 for chunk in &chunks {
396 assert!(chunk.len() <= 10 || !chunk.contains(' '));
397 }
398 }
399
400 #[test]
401 fn test_recommended_strategy() {
402 assert_eq!(
403 recommended_strategy("telegram"),
404 StreamStrategy::EditInPlace
405 );
406 assert_eq!(recommended_strategy("irc"), StreamStrategy::Chunked);
407 assert_eq!(recommended_strategy("imessage"), StreamStrategy::BufferAll);
408 assert_eq!(
409 recommended_strategy("unknown"),
410 StreamStrategy::BufferAll
411 );
412 }
413}