chat-system 0.1.3

A multi-protocol async chat crate — single interface for IRC, Matrix, Discord, Telegram, Slack, Signal, WhatsApp, and more
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
//! Streaming support for messenger channels.
//!
//! Provides utilities for streaming model responses to messenger channels
//! in real-time, rather than waiting for the full response. Different
//! messengers support different streaming strategies:
//!
//! - **Edit-based**: Send an initial message, then edit it as tokens arrive
//!   (Telegram, Discord, Slack).
//! - **Chunked**: Send partial messages at intervals (IRC, generic).
//! - **Draft**: Use platform-specific draft/typing APIs (where available).

use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant};
use tracing::debug;

/// Streaming strategy for a messenger.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum StreamStrategy {
    /// Send initial message, then edit in place as tokens arrive.
    #[default]
    EditInPlace,
    /// Accumulate tokens and send chunks at intervals.
    Chunked,
    /// Wait for full response before sending (no streaming).
    BufferAll,
}

/// Configuration for messenger streaming.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
    /// Whether streaming is enabled for this messenger.
    #[serde(default)]
    pub enabled: bool,

    /// Streaming strategy.
    #[serde(default)]
    pub strategy: StreamStrategy,

    /// Minimum interval between message edits (milliseconds).
    /// Prevents rate-limiting from too-frequent edits.
    #[serde(default = "default_edit_interval_ms")]
    pub edit_interval_ms: u64,

    /// For chunked strategy: minimum characters before sending a chunk.
    #[serde(default = "default_chunk_min_chars")]
    pub chunk_min_chars: usize,

    /// Maximum message length before splitting into multiple messages.
    #[serde(default = "default_max_message_len")]
    pub max_message_len: usize,

    /// Whether to show a typing indicator while generating.
    #[serde(default = "default_true")]
    pub show_typing: bool,

    /// Suffix to append while streaming is in progress (e.g., " ▌").
    #[serde(default = "default_cursor")]
    pub streaming_cursor: String,
}

fn default_edit_interval_ms() -> u64 {
    500
}

fn default_chunk_min_chars() -> usize {
    100
}

fn default_max_message_len() -> usize {
    4000
}

fn default_true() -> bool {
    true
}

fn default_cursor() -> String {
    "".to_string()
}

impl Default for StreamConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            strategy: StreamStrategy::default(),
            edit_interval_ms: default_edit_interval_ms(),
            chunk_min_chars: default_chunk_min_chars(),
            max_message_len: default_max_message_len(),
            show_typing: true,
            streaming_cursor: default_cursor(),
        }
    }
}

/// Buffer that accumulates streaming tokens and decides when to flush.
pub struct StreamBuffer {
    /// Accumulated text.
    content: String,
    /// Last time a flush (edit/send) was performed.
    last_flush: Instant,
    /// Configuration.
    config: StreamConfig,
    /// Number of flushes performed.
    flush_count: usize,
    /// Whether the stream is complete.
    done: bool,
}

impl StreamBuffer {
    /// Create a new stream buffer.
    pub fn new(config: StreamConfig) -> Self {
        Self {
            content: String::new(),
            last_flush: Instant::now(),
            config,
            flush_count: 0,
            done: false,
        }
    }

    /// Add a text chunk to the buffer.
    pub fn push(&mut self, text: &str) {
        self.content.push_str(text);
    }

    /// Mark the stream as complete.
    pub fn finish(&mut self) {
        self.done = true;
    }

    /// Check if a flush is needed based on strategy and timing.
    pub fn should_flush(&self) -> bool {
        if self.done {
            return true;
        }

        let elapsed = self.last_flush.elapsed();
        let interval = Duration::from_millis(self.config.edit_interval_ms);

        match self.config.strategy {
            StreamStrategy::EditInPlace => elapsed >= interval && !self.content.is_empty(),
            StreamStrategy::Chunked => {
                elapsed >= interval && self.content.len() >= self.config.chunk_min_chars
            }
            StreamStrategy::BufferAll => self.done,
        }
    }

    /// Get the current content to send/edit.
    ///
    /// For EditInPlace: returns full accumulated content with cursor.
    /// For Chunked: returns the pending chunk and clears the buffer.
    pub fn flush(&mut self) -> Option<FlushAction> {
        if self.content.is_empty() && !self.done {
            return None;
        }

        self.last_flush = Instant::now();
        self.flush_count += 1;

        let action = match self.config.strategy {
            StreamStrategy::EditInPlace => {
                let display_text = if self.done {
                    self.content.clone()
                } else {
                    format!("{}{}", self.content, self.config.streaming_cursor)
                };

                if self.flush_count == 1 {
                    FlushAction::SendNew(display_text)
                } else {
                    FlushAction::EditExisting(display_text)
                }
            }
            StreamStrategy::Chunked => {
                let chunk = std::mem::take(&mut self.content);
                if chunk.is_empty() {
                    return None;
                }
                FlushAction::SendNew(chunk)
            }
            StreamStrategy::BufferAll => {
                if self.done {
                    FlushAction::SendNew(std::mem::take(&mut self.content))
                } else {
                    return None;
                }
            }
        };

        debug!(
            strategy = ?self.config.strategy,
            flush_count = self.flush_count,
            done = self.done,
            "Stream buffer flushed"
        );

        Some(action)
    }

    /// Check if streaming is complete.
    pub fn is_done(&self) -> bool {
        self.done
    }

    /// Get current content length.
    pub fn content_len(&self) -> usize {
        self.content.len()
    }

    /// Check if content exceeds max message length and needs splitting.
    pub fn needs_split(&self) -> bool {
        self.content.len() > self.config.max_message_len
    }

    /// Split content into message-sized chunks.
    pub fn split_content(&self) -> Vec<String> {
        let max_len = self.config.max_message_len;
        if self.content.len() <= max_len {
            return vec![self.content.clone()];
        }

        let mut chunks = Vec::new();
        let mut remaining = self.content.as_str();

        while !remaining.is_empty() {
            if remaining.len() <= max_len {
                chunks.push(remaining.to_string());
                break;
            }

            // Find a char boundary at or before max_len to avoid
            // panicking when max_len falls inside a multi-byte codepoint.
            let mut boundary = max_len;
            while boundary > 0 && !remaining.is_char_boundary(boundary) {
                boundary -= 1;
            }

            // Try to split at a newline or space within the boundary
            let split_at = remaining[..boundary]
                .rfind('\n')
                .or_else(|| remaining[..boundary].rfind(' '))
                .unwrap_or(boundary);

            chunks.push(remaining[..split_at].to_string());
            remaining = remaining[split_at..].trim_start();
        }

        chunks
    }
}

/// Action to perform after flushing the stream buffer.
#[derive(Debug, Clone)]
pub enum FlushAction {
    /// Send a new message.
    SendNew(String),
    /// Edit the previously sent message.
    EditExisting(String),
}

/// Get the recommended stream strategy for a messenger type.
pub fn recommended_strategy(messenger_type: &str) -> StreamStrategy {
    match messenger_type {
        "telegram" | "discord" | "slack" => StreamStrategy::EditInPlace,
        "irc" | "webhook" => StreamStrategy::Chunked,
        "teams" | "google_chat" => StreamStrategy::EditInPlace,
        "imessage" => StreamStrategy::BufferAll,
        _ => StreamStrategy::BufferAll,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_stream_config_defaults() {
        let config = StreamConfig::default();
        assert!(!config.enabled);
        assert_eq!(config.strategy, StreamStrategy::EditInPlace);
        assert_eq!(config.edit_interval_ms, 500);
        assert!(config.show_typing);
    }

    #[test]
    fn test_stream_buffer_edit_in_place() {
        let config = StreamConfig {
            enabled: true,
            strategy: StreamStrategy::EditInPlace,
            edit_interval_ms: 0, // immediate
            ..Default::default()
        };

        let mut buf = StreamBuffer::new(config);
        buf.push("Hello ");
        buf.push("world");

        // First flush should be SendNew
        let action = buf.flush().unwrap();
        assert!(matches!(action, FlushAction::SendNew(_)));

        buf.push("!");
        // Subsequent flush should be EditExisting
        let action = buf.flush().unwrap();
        assert!(matches!(action, FlushAction::EditExisting(_)));
    }

    #[test]
    fn test_stream_buffer_chunked() {
        let config = StreamConfig {
            enabled: true,
            strategy: StreamStrategy::Chunked,
            edit_interval_ms: 0,
            chunk_min_chars: 5,
            ..Default::default()
        };

        let mut buf = StreamBuffer::new(config);
        buf.push("Hello");

        let action = buf.flush().unwrap();
        assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello"));

        // Buffer should be cleared after chunked flush
        assert_eq!(buf.content_len(), 0);
    }

    #[test]
    fn test_stream_buffer_buffer_all() {
        let config = StreamConfig {
            enabled: true,
            strategy: StreamStrategy::BufferAll,
            ..Default::default()
        };

        let mut buf = StreamBuffer::new(config);
        buf.push("Hello ");
        buf.push("world");

        // Should not flush until done
        assert!(!buf.should_flush());

        buf.finish();
        assert!(buf.should_flush());

        let action = buf.flush().unwrap();
        assert!(matches!(action, FlushAction::SendNew(ref s) if s == "Hello world"));
    }

    #[test]
    fn test_stream_buffer_cursor() {
        let config = StreamConfig {
            enabled: true,
            strategy: StreamStrategy::EditInPlace,
            edit_interval_ms: 0,
            streaming_cursor: "".to_string(),
            ..Default::default()
        };

        let mut buf = StreamBuffer::new(config);
        buf.push("typing...");

        let action = buf.flush().unwrap();
        if let FlushAction::SendNew(text) = action {
            assert!(text.ends_with(""));
        }

        buf.finish();
        buf.push(""); // trigger final state
        let action = buf.flush().unwrap();
        if let FlushAction::EditExisting(text) = action {
            assert!(!text.ends_with(""));
        }
    }

    #[test]
    fn test_split_content() {
        let config = StreamConfig {
            max_message_len: 10,
            ..Default::default()
        };

        let mut buf = StreamBuffer::new(config);
        buf.push("Hello world, this is a test");

        let chunks = buf.split_content();
        assert!(chunks.len() > 1);
        for chunk in &chunks {
            assert!(chunk.len() <= 10 || !chunk.contains(' '));
        }
    }

    #[test]
    fn test_recommended_strategy() {
        assert_eq!(
            recommended_strategy("telegram"),
            StreamStrategy::EditInPlace
        );
        assert_eq!(recommended_strategy("irc"), StreamStrategy::Chunked);
        assert_eq!(recommended_strategy("imessage"), StreamStrategy::BufferAll);
        assert_eq!(recommended_strategy("unknown"), StreamStrategy::BufferAll);
    }
}