orchflow_terminal/
buffer.rs

1// Terminal Buffer Management
2//
3// Handles output buffering, scrollback history, and efficient
4// data management for terminal streams.
5
6use bytes::Bytes;
7use std::collections::VecDeque;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// Maximum scrollback lines to keep in memory
12const DEFAULT_MAX_SCROLLBACK: usize = 10000;
13
14/// Maximum size of a single output chunk
15const MAX_CHUNK_SIZE: usize = 64 * 1024; // 64KB
16
17/// Output buffer for efficient batching
18pub struct OutputBuffer {
19    buffer: Vec<u8>,
20    max_size: usize,
21    flush_interval: tokio::time::Duration,
22    last_flush: tokio::time::Instant,
23}
24
25impl OutputBuffer {
26    pub fn new(max_size: usize) -> Self {
27        Self {
28            buffer: Vec::with_capacity(max_size),
29            max_size,
30            flush_interval: tokio::time::Duration::from_millis(16), // ~60fps
31            last_flush: tokio::time::Instant::now(),
32        }
33    }
34
35    /// Create with default max chunk size
36    pub fn with_default_size() -> Self {
37        Self::new(MAX_CHUNK_SIZE)
38    }
39
40    /// Add data to buffer
41    pub fn push(&mut self, data: &[u8]) -> Option<Bytes> {
42        // If data is larger than max chunk size, split it
43        if data.len() > MAX_CHUNK_SIZE {
44            let mut result = Vec::new();
45            for chunk in data.chunks(MAX_CHUNK_SIZE) {
46                self.buffer.extend_from_slice(chunk);
47                if self.should_flush() {
48                    result.push(self.flush());
49                }
50            }
51            if result.is_empty() {
52                None
53            } else {
54                // Concatenate all chunks
55                let total_len = result.iter().map(|b| b.len()).sum();
56                let mut combined = Vec::with_capacity(total_len);
57                for bytes in result {
58                    combined.extend_from_slice(&bytes);
59                }
60                Some(Bytes::from(combined))
61            }
62        } else {
63            self.buffer.extend_from_slice(data);
64
65            // Check if we should flush
66            if self.should_flush() {
67                Some(self.flush())
68            } else {
69                None
70            }
71        }
72    }
73
74    /// Check if buffer should be flushed
75    pub fn should_flush(&self) -> bool {
76        self.buffer.len() >= self.max_size || self.last_flush.elapsed() >= self.flush_interval
77    }
78
79    /// Flush the buffer
80    pub fn flush(&mut self) -> Bytes {
81        let data = Bytes::from(self.buffer.clone());
82        self.buffer.clear();
83        self.last_flush = tokio::time::Instant::now();
84        data
85    }
86
87    /// Force flush if there's any data
88    pub fn force_flush(&mut self) -> Option<Bytes> {
89        if !self.buffer.is_empty() {
90            Some(self.flush())
91        } else {
92            None
93        }
94    }
95}
96
97/// Scrollback buffer for terminal history
98pub struct ScrollbackBuffer {
99    lines: Arc<RwLock<VecDeque<ScrollbackLine>>>,
100    max_lines: usize,
101    total_size: Arc<RwLock<usize>>,
102    max_total_size: usize,
103}
104
105#[derive(Clone)]
106pub struct ScrollbackLine {
107    pub content: Bytes,
108    pub timestamp: chrono::DateTime<chrono::Utc>,
109    pub line_number: usize,
110}
111
112impl ScrollbackBuffer {
113    pub fn new(max_lines: usize) -> Self {
114        Self {
115            lines: Arc::new(RwLock::new(VecDeque::with_capacity(max_lines))),
116            max_lines,
117            total_size: Arc::new(RwLock::new(0)),
118            max_total_size: 10 * 1024 * 1024, // 10MB default
119        }
120    }
121
122    /// Create with default scrollback size
123    pub fn with_default_size() -> Self {
124        Self::new(DEFAULT_MAX_SCROLLBACK)
125    }
126
127    /// Add output to scrollback
128    pub async fn add_output(&self, data: &[u8]) {
129        let mut current_line = Vec::new();
130        let mut lines = self.lines.write().await;
131        let mut total_size = self.total_size.write().await;
132
133        // Split data into lines
134        for &byte in data {
135            current_line.push(byte);
136
137            if byte == b'\n' {
138                // Complete line
139                let line = ScrollbackLine {
140                    content: Bytes::from(current_line.clone()),
141                    timestamp: chrono::Utc::now(),
142                    line_number: lines.len(),
143                };
144
145                *total_size += line.content.len();
146                lines.push_back(line);
147                current_line.clear();
148
149                // Trim if needed
150                while lines.len() > self.max_lines || *total_size > self.max_total_size {
151                    if let Some(removed) = lines.pop_front() {
152                        *total_size -= removed.content.len();
153                    }
154                }
155            }
156        }
157
158        // Handle incomplete line
159        if !current_line.is_empty() {
160            // If we have an incomplete line at the end, append to last line
161            if let Some(last_line) = lines.back_mut() {
162                let mut combined = last_line.content.to_vec();
163                combined.extend_from_slice(&current_line);
164                *total_size -= last_line.content.len();
165                last_line.content = Bytes::from(combined);
166                *total_size += last_line.content.len();
167            } else {
168                // First line
169                let line = ScrollbackLine {
170                    content: Bytes::from(current_line),
171                    timestamp: chrono::Utc::now(),
172                    line_number: 0,
173                };
174                *total_size += line.content.len();
175                lines.push_back(line);
176            }
177        }
178    }
179
180    /// Get lines from scrollback
181    pub async fn get_lines(&self, start: usize, count: usize) -> Vec<ScrollbackLine> {
182        let lines = self.lines.read().await;
183        lines.iter().skip(start).take(count).cloned().collect()
184    }
185
186    /// Get last N lines
187    pub async fn get_last_lines(&self, count: usize) -> Vec<ScrollbackLine> {
188        let lines = self.lines.read().await;
189        let start = lines.len().saturating_sub(count);
190        lines.iter().skip(start).cloned().collect()
191    }
192
193    /// Search scrollback
194    pub async fn search(
195        &self,
196        pattern: &str,
197        case_sensitive: bool,
198    ) -> Vec<(usize, ScrollbackLine)> {
199        let lines = self.lines.read().await;
200        let pattern = if case_sensitive {
201            pattern.to_string()
202        } else {
203            pattern.to_lowercase()
204        };
205
206        lines
207            .iter()
208            .enumerate()
209            .filter_map(|(idx, line)| {
210                let content = String::from_utf8_lossy(&line.content);
211                let content_to_search = if case_sensitive {
212                    content.to_string()
213                } else {
214                    content.to_lowercase()
215                };
216
217                if content_to_search.contains(&pattern) {
218                    Some((idx, line.clone()))
219                } else {
220                    None
221                }
222            })
223            .collect()
224    }
225
226    /// Clear scrollback
227    pub async fn clear(&self) {
228        self.lines.write().await.clear();
229        *self.total_size.write().await = 0;
230    }
231
232    /// Get total line count
233    pub async fn line_count(&self) -> usize {
234        self.lines.read().await.len()
235    }
236
237    /// Get total size in bytes
238    pub async fn total_size(&self) -> usize {
239        *self.total_size.read().await
240    }
241}
242
243/// Ring buffer for efficient terminal output
244pub struct RingBuffer {
245    buffer: Vec<u8>,
246    capacity: usize,
247    head: usize,
248    tail: usize,
249    size: usize,
250}
251
252impl RingBuffer {
253    pub fn new(capacity: usize) -> Self {
254        Self {
255            buffer: vec![0; capacity],
256            capacity,
257            head: 0,
258            tail: 0,
259            size: 0,
260        }
261    }
262
263    /// Write data to ring buffer
264    pub fn write(&mut self, data: &[u8]) {
265        for &byte in data {
266            self.buffer[self.head] = byte;
267            self.head = (self.head + 1) % self.capacity;
268
269            if self.size < self.capacity {
270                self.size += 1;
271            } else {
272                // Overwrite oldest data
273                self.tail = (self.tail + 1) % self.capacity;
274            }
275        }
276    }
277
278    /// Read all data from ring buffer
279    pub fn read_all(&self) -> Vec<u8> {
280        let mut result = Vec::with_capacity(self.size);
281
282        if self.size == 0 {
283            return result;
284        }
285
286        if self.tail < self.head {
287            result.extend_from_slice(&self.buffer[self.tail..self.head]);
288        } else {
289            result.extend_from_slice(&self.buffer[self.tail..]);
290            result.extend_from_slice(&self.buffer[..self.head]);
291        }
292
293        result
294    }
295
296    /// Clear the buffer
297    pub fn clear(&mut self) {
298        self.head = 0;
299        self.tail = 0;
300        self.size = 0;
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use crate::buffer::{
307        OutputBuffer, RingBuffer, ScrollbackBuffer, DEFAULT_MAX_SCROLLBACK, MAX_CHUNK_SIZE,
308    };
309
310    #[test]
311    fn test_output_buffer_respects_chunk_size() {
312        let mut buffer = OutputBuffer::new(100);
313
314        // Add data smaller than limit
315        let small_data = vec![b'a'; 50];
316        assert!(buffer.push(&small_data).is_none());
317
318        // Add more data to exceed limit
319        let more_data = vec![b'b'; 60];
320        let flushed = buffer.push(&more_data);
321        assert!(flushed.is_some());
322        assert_eq!(flushed.unwrap().len(), 110);
323    }
324
325    #[test]
326    fn test_output_buffer_splits_large_chunks() {
327        let mut buffer = OutputBuffer::with_default_size();
328
329        // Add data larger than MAX_CHUNK_SIZE
330        let large_data = vec![b'x'; MAX_CHUNK_SIZE * 2 + 1000];
331        let result = buffer.push(&large_data);
332
333        // Should have flushed data
334        assert!(result.is_some());
335
336        // Buffer should still have remaining data
337        assert!(!buffer.buffer.is_empty());
338        assert!(buffer.buffer.len() < MAX_CHUNK_SIZE);
339    }
340
341    #[tokio::test]
342    async fn test_scrollback_buffer_enforces_line_limit() {
343        let buffer = ScrollbackBuffer::new(10);
344
345        // Add more lines than the limit
346        for i in 0..20 {
347            let line = format!("Line {i}\n");
348            buffer.add_output(line.as_bytes()).await;
349        }
350
351        // Should only have 10 lines
352        let lines = buffer.get_lines(0, 100).await;
353        assert_eq!(lines.len(), 10);
354
355        // Should have kept the newest lines
356        let first_line = String::from_utf8(lines[0].content.to_vec()).unwrap();
357        assert!(first_line.contains("Line 10"));
358    }
359
360    #[tokio::test]
361    async fn test_scrollback_buffer_enforces_size_limit() {
362        let buffer = ScrollbackBuffer::new(1000);
363
364        // Add lines that exceed total size limit
365        let large_line = vec![b'x'; 1024 * 1024]; // 1MB line
366        for _ in 0..15 {
367            buffer.add_output(&large_line).await;
368            buffer.add_output(b"\n").await;
369        }
370
371        // Total size should be under 10MB
372        let total_size = *buffer.total_size.read().await;
373        assert!(total_size <= buffer.max_total_size);
374
375        // Should have fewer lines due to size constraint
376        let lines = buffer.get_lines(0, 1000).await;
377        assert!(lines.len() < 15);
378    }
379
380    #[tokio::test]
381    async fn test_scrollback_default_size() {
382        let buffer = ScrollbackBuffer::with_default_size();
383
384        // Add DEFAULT_MAX_SCROLLBACK + 100 lines
385        for i in 0..(DEFAULT_MAX_SCROLLBACK + 100) {
386            let line = format!("Line {i}\n");
387            buffer.add_output(line.as_bytes()).await;
388        }
389
390        // Should be limited to DEFAULT_MAX_SCROLLBACK
391        let lines = buffer.get_lines(0, DEFAULT_MAX_SCROLLBACK + 200).await;
392        assert_eq!(lines.len(), DEFAULT_MAX_SCROLLBACK);
393    }
394
395    #[test]
396    fn test_ring_buffer_wrap_around() {
397        let mut ring = RingBuffer::new(10);
398
399        // Fill buffer
400        ring.write(b"0123456789");
401        assert_eq!(ring.read_all(), b"0123456789");
402
403        // Write more to wrap around
404        ring.write(b"ABCDE");
405        let data = ring.read_all();
406        assert_eq!(data.len(), 10);
407        assert_eq!(&data[0..5], b"56789");
408        assert_eq!(&data[5..10], b"ABCDE");
409    }
410
411    #[tokio::test]
412    async fn test_scrollback_search_case_sensitive() {
413        let buffer = ScrollbackBuffer::new(100);
414
415        // Add test data
416        buffer.add_output(b"Hello World\n").await;
417        buffer.add_output(b"hello world\n").await;
418        buffer.add_output(b"HELLO WORLD\n").await;
419        buffer.add_output(b"Testing search functionality\n").await;
420
421        // Case sensitive search
422        let results = buffer.search("Hello", true).await;
423        assert_eq!(results.len(), 1);
424        assert!(String::from_utf8_lossy(&results[0].1.content).contains("Hello World"));
425
426        // Case insensitive search
427        let results = buffer.search("hello", false).await;
428        assert_eq!(results.len(), 3);
429    }
430
431    #[tokio::test]
432    async fn test_scrollback_search_pattern_matching() {
433        let buffer = ScrollbackBuffer::new(100);
434
435        // Add test data with different patterns
436        buffer.add_output(b"Error: File not found\n").await;
437        buffer.add_output(b"Warning: Deprecated function\n").await;
438        buffer.add_output(b"Info: Processing complete\n").await;
439        buffer.add_output(b"Error: Permission denied\n").await;
440
441        // Search for errors
442        let error_results = buffer.search("Error:", true).await;
443        assert_eq!(error_results.len(), 2);
444
445        // Search for warnings
446        let warning_results = buffer.search("Warning:", true).await;
447        assert_eq!(warning_results.len(), 1);
448
449        // Search for non-existent pattern
450        let no_results = buffer.search("Debug:", true).await;
451        assert_eq!(no_results.len(), 0);
452    }
453
454    #[tokio::test]
455    async fn test_scrollback_get_lines_range() {
456        let buffer = ScrollbackBuffer::new(100);
457
458        // Add numbered lines
459        for i in 0..10 {
460            let line = format!("Line {i}\n");
461            buffer.add_output(line.as_bytes()).await;
462        }
463
464        // Get specific range
465        let lines = buffer.get_lines(2, 3).await;
466        assert_eq!(lines.len(), 3);
467        assert!(String::from_utf8_lossy(&lines[0].content).contains("Line 2"));
468        assert!(String::from_utf8_lossy(&lines[1].content).contains("Line 3"));
469        assert!(String::from_utf8_lossy(&lines[2].content).contains("Line 4"));
470
471        // Get last lines
472        let last_lines = buffer.get_last_lines(3).await;
473        assert_eq!(last_lines.len(), 3);
474        assert!(String::from_utf8_lossy(&last_lines[2].content).contains("Line 9"));
475    }
476
477    #[tokio::test]
478    async fn test_scrollback_clear_functionality() {
479        let buffer = ScrollbackBuffer::new(100);
480
481        // Add some data
482        buffer.add_output(b"Test line 1\n").await;
483        buffer.add_output(b"Test line 2\n").await;
484
485        // Verify data exists
486        let lines_before = buffer.get_lines(0, 10).await;
487        assert_eq!(lines_before.len(), 2);
488        assert!(buffer.total_size().await > 0);
489
490        // Clear buffer
491        buffer.clear().await;
492
493        // Verify data is cleared
494        let lines_after = buffer.get_lines(0, 10).await;
495        assert_eq!(lines_after.len(), 0);
496        assert_eq!(buffer.total_size().await, 0);
497        assert_eq!(buffer.line_count().await, 0);
498    }
499}