orchflow_terminal/
buffer.rs1use bytes::Bytes;
7use std::collections::VecDeque;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11const DEFAULT_MAX_SCROLLBACK: usize = 10000;
13
14const MAX_CHUNK_SIZE: usize = 64 * 1024; pub struct OutputBuffer {
19 buffer: Vec<u8>,
20 max_size: usize,
21 flush_interval: tokio::time::Duration,
22 last_flush: tokio::time::Instant,
23}
24
25impl OutputBuffer {
26 pub fn new(max_size: usize) -> Self {
27 Self {
28 buffer: Vec::with_capacity(max_size),
29 max_size,
30 flush_interval: tokio::time::Duration::from_millis(16), last_flush: tokio::time::Instant::now(),
32 }
33 }
34
35 pub fn with_default_size() -> Self {
37 Self::new(MAX_CHUNK_SIZE)
38 }
39
40 pub fn push(&mut self, data: &[u8]) -> Option<Bytes> {
42 if data.len() > MAX_CHUNK_SIZE {
44 let mut result = Vec::new();
45 for chunk in data.chunks(MAX_CHUNK_SIZE) {
46 self.buffer.extend_from_slice(chunk);
47 if self.should_flush() {
48 result.push(self.flush());
49 }
50 }
51 if result.is_empty() {
52 None
53 } else {
54 let total_len = result.iter().map(|b| b.len()).sum();
56 let mut combined = Vec::with_capacity(total_len);
57 for bytes in result {
58 combined.extend_from_slice(&bytes);
59 }
60 Some(Bytes::from(combined))
61 }
62 } else {
63 self.buffer.extend_from_slice(data);
64
65 if self.should_flush() {
67 Some(self.flush())
68 } else {
69 None
70 }
71 }
72 }
73
74 pub fn should_flush(&self) -> bool {
76 self.buffer.len() >= self.max_size || self.last_flush.elapsed() >= self.flush_interval
77 }
78
79 pub fn flush(&mut self) -> Bytes {
81 let data = Bytes::from(self.buffer.clone());
82 self.buffer.clear();
83 self.last_flush = tokio::time::Instant::now();
84 data
85 }
86
87 pub fn force_flush(&mut self) -> Option<Bytes> {
89 if !self.buffer.is_empty() {
90 Some(self.flush())
91 } else {
92 None
93 }
94 }
95}
96
97pub struct ScrollbackBuffer {
99 lines: Arc<RwLock<VecDeque<ScrollbackLine>>>,
100 max_lines: usize,
101 total_size: Arc<RwLock<usize>>,
102 max_total_size: usize,
103}
104
105#[derive(Clone)]
106pub struct ScrollbackLine {
107 pub content: Bytes,
108 pub timestamp: chrono::DateTime<chrono::Utc>,
109 pub line_number: usize,
110}
111
112impl ScrollbackBuffer {
113 pub fn new(max_lines: usize) -> Self {
114 Self {
115 lines: Arc::new(RwLock::new(VecDeque::with_capacity(max_lines))),
116 max_lines,
117 total_size: Arc::new(RwLock::new(0)),
118 max_total_size: 10 * 1024 * 1024, }
120 }
121
122 pub fn with_default_size() -> Self {
124 Self::new(DEFAULT_MAX_SCROLLBACK)
125 }
126
127 pub async fn add_output(&self, data: &[u8]) {
129 let mut current_line = Vec::new();
130 let mut lines = self.lines.write().await;
131 let mut total_size = self.total_size.write().await;
132
133 for &byte in data {
135 current_line.push(byte);
136
137 if byte == b'\n' {
138 let line = ScrollbackLine {
140 content: Bytes::from(current_line.clone()),
141 timestamp: chrono::Utc::now(),
142 line_number: lines.len(),
143 };
144
145 *total_size += line.content.len();
146 lines.push_back(line);
147 current_line.clear();
148
149 while lines.len() > self.max_lines || *total_size > self.max_total_size {
151 if let Some(removed) = lines.pop_front() {
152 *total_size -= removed.content.len();
153 }
154 }
155 }
156 }
157
158 if !current_line.is_empty() {
160 if let Some(last_line) = lines.back_mut() {
162 let mut combined = last_line.content.to_vec();
163 combined.extend_from_slice(¤t_line);
164 *total_size -= last_line.content.len();
165 last_line.content = Bytes::from(combined);
166 *total_size += last_line.content.len();
167 } else {
168 let line = ScrollbackLine {
170 content: Bytes::from(current_line),
171 timestamp: chrono::Utc::now(),
172 line_number: 0,
173 };
174 *total_size += line.content.len();
175 lines.push_back(line);
176 }
177 }
178 }
179
180 pub async fn get_lines(&self, start: usize, count: usize) -> Vec<ScrollbackLine> {
182 let lines = self.lines.read().await;
183 lines.iter().skip(start).take(count).cloned().collect()
184 }
185
186 pub async fn get_last_lines(&self, count: usize) -> Vec<ScrollbackLine> {
188 let lines = self.lines.read().await;
189 let start = lines.len().saturating_sub(count);
190 lines.iter().skip(start).cloned().collect()
191 }
192
193 pub async fn search(
195 &self,
196 pattern: &str,
197 case_sensitive: bool,
198 ) -> Vec<(usize, ScrollbackLine)> {
199 let lines = self.lines.read().await;
200 let pattern = if case_sensitive {
201 pattern.to_string()
202 } else {
203 pattern.to_lowercase()
204 };
205
206 lines
207 .iter()
208 .enumerate()
209 .filter_map(|(idx, line)| {
210 let content = String::from_utf8_lossy(&line.content);
211 let content_to_search = if case_sensitive {
212 content.to_string()
213 } else {
214 content.to_lowercase()
215 };
216
217 if content_to_search.contains(&pattern) {
218 Some((idx, line.clone()))
219 } else {
220 None
221 }
222 })
223 .collect()
224 }
225
226 pub async fn clear(&self) {
228 self.lines.write().await.clear();
229 *self.total_size.write().await = 0;
230 }
231
232 pub async fn line_count(&self) -> usize {
234 self.lines.read().await.len()
235 }
236
237 pub async fn total_size(&self) -> usize {
239 *self.total_size.read().await
240 }
241}
242
243pub struct RingBuffer {
245 buffer: Vec<u8>,
246 capacity: usize,
247 head: usize,
248 tail: usize,
249 size: usize,
250}
251
252impl RingBuffer {
253 pub fn new(capacity: usize) -> Self {
254 Self {
255 buffer: vec![0; capacity],
256 capacity,
257 head: 0,
258 tail: 0,
259 size: 0,
260 }
261 }
262
263 pub fn write(&mut self, data: &[u8]) {
265 for &byte in data {
266 self.buffer[self.head] = byte;
267 self.head = (self.head + 1) % self.capacity;
268
269 if self.size < self.capacity {
270 self.size += 1;
271 } else {
272 self.tail = (self.tail + 1) % self.capacity;
274 }
275 }
276 }
277
278 pub fn read_all(&self) -> Vec<u8> {
280 let mut result = Vec::with_capacity(self.size);
281
282 if self.size == 0 {
283 return result;
284 }
285
286 if self.tail < self.head {
287 result.extend_from_slice(&self.buffer[self.tail..self.head]);
288 } else {
289 result.extend_from_slice(&self.buffer[self.tail..]);
290 result.extend_from_slice(&self.buffer[..self.head]);
291 }
292
293 result
294 }
295
296 pub fn clear(&mut self) {
298 self.head = 0;
299 self.tail = 0;
300 self.size = 0;
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use crate::buffer::{
307 OutputBuffer, RingBuffer, ScrollbackBuffer, DEFAULT_MAX_SCROLLBACK, MAX_CHUNK_SIZE,
308 };
309
310 #[test]
311 fn test_output_buffer_respects_chunk_size() {
312 let mut buffer = OutputBuffer::new(100);
313
314 let small_data = vec![b'a'; 50];
316 assert!(buffer.push(&small_data).is_none());
317
318 let more_data = vec![b'b'; 60];
320 let flushed = buffer.push(&more_data);
321 assert!(flushed.is_some());
322 assert_eq!(flushed.unwrap().len(), 110);
323 }
324
325 #[test]
326 fn test_output_buffer_splits_large_chunks() {
327 let mut buffer = OutputBuffer::with_default_size();
328
329 let large_data = vec![b'x'; MAX_CHUNK_SIZE * 2 + 1000];
331 let result = buffer.push(&large_data);
332
333 assert!(result.is_some());
335
336 assert!(!buffer.buffer.is_empty());
338 assert!(buffer.buffer.len() < MAX_CHUNK_SIZE);
339 }
340
341 #[tokio::test]
342 async fn test_scrollback_buffer_enforces_line_limit() {
343 let buffer = ScrollbackBuffer::new(10);
344
345 for i in 0..20 {
347 let line = format!("Line {i}\n");
348 buffer.add_output(line.as_bytes()).await;
349 }
350
351 let lines = buffer.get_lines(0, 100).await;
353 assert_eq!(lines.len(), 10);
354
355 let first_line = String::from_utf8(lines[0].content.to_vec()).unwrap();
357 assert!(first_line.contains("Line 10"));
358 }
359
360 #[tokio::test]
361 async fn test_scrollback_buffer_enforces_size_limit() {
362 let buffer = ScrollbackBuffer::new(1000);
363
364 let large_line = vec![b'x'; 1024 * 1024]; for _ in 0..15 {
367 buffer.add_output(&large_line).await;
368 buffer.add_output(b"\n").await;
369 }
370
371 let total_size = *buffer.total_size.read().await;
373 assert!(total_size <= buffer.max_total_size);
374
375 let lines = buffer.get_lines(0, 1000).await;
377 assert!(lines.len() < 15);
378 }
379
380 #[tokio::test]
381 async fn test_scrollback_default_size() {
382 let buffer = ScrollbackBuffer::with_default_size();
383
384 for i in 0..(DEFAULT_MAX_SCROLLBACK + 100) {
386 let line = format!("Line {i}\n");
387 buffer.add_output(line.as_bytes()).await;
388 }
389
390 let lines = buffer.get_lines(0, DEFAULT_MAX_SCROLLBACK + 200).await;
392 assert_eq!(lines.len(), DEFAULT_MAX_SCROLLBACK);
393 }
394
395 #[test]
396 fn test_ring_buffer_wrap_around() {
397 let mut ring = RingBuffer::new(10);
398
399 ring.write(b"0123456789");
401 assert_eq!(ring.read_all(), b"0123456789");
402
403 ring.write(b"ABCDE");
405 let data = ring.read_all();
406 assert_eq!(data.len(), 10);
407 assert_eq!(&data[0..5], b"56789");
408 assert_eq!(&data[5..10], b"ABCDE");
409 }
410
411 #[tokio::test]
412 async fn test_scrollback_search_case_sensitive() {
413 let buffer = ScrollbackBuffer::new(100);
414
415 buffer.add_output(b"Hello World\n").await;
417 buffer.add_output(b"hello world\n").await;
418 buffer.add_output(b"HELLO WORLD\n").await;
419 buffer.add_output(b"Testing search functionality\n").await;
420
421 let results = buffer.search("Hello", true).await;
423 assert_eq!(results.len(), 1);
424 assert!(String::from_utf8_lossy(&results[0].1.content).contains("Hello World"));
425
426 let results = buffer.search("hello", false).await;
428 assert_eq!(results.len(), 3);
429 }
430
431 #[tokio::test]
432 async fn test_scrollback_search_pattern_matching() {
433 let buffer = ScrollbackBuffer::new(100);
434
435 buffer.add_output(b"Error: File not found\n").await;
437 buffer.add_output(b"Warning: Deprecated function\n").await;
438 buffer.add_output(b"Info: Processing complete\n").await;
439 buffer.add_output(b"Error: Permission denied\n").await;
440
441 let error_results = buffer.search("Error:", true).await;
443 assert_eq!(error_results.len(), 2);
444
445 let warning_results = buffer.search("Warning:", true).await;
447 assert_eq!(warning_results.len(), 1);
448
449 let no_results = buffer.search("Debug:", true).await;
451 assert_eq!(no_results.len(), 0);
452 }
453
454 #[tokio::test]
455 async fn test_scrollback_get_lines_range() {
456 let buffer = ScrollbackBuffer::new(100);
457
458 for i in 0..10 {
460 let line = format!("Line {i}\n");
461 buffer.add_output(line.as_bytes()).await;
462 }
463
464 let lines = buffer.get_lines(2, 3).await;
466 assert_eq!(lines.len(), 3);
467 assert!(String::from_utf8_lossy(&lines[0].content).contains("Line 2"));
468 assert!(String::from_utf8_lossy(&lines[1].content).contains("Line 3"));
469 assert!(String::from_utf8_lossy(&lines[2].content).contains("Line 4"));
470
471 let last_lines = buffer.get_last_lines(3).await;
473 assert_eq!(last_lines.len(), 3);
474 assert!(String::from_utf8_lossy(&last_lines[2].content).contains("Line 9"));
475 }
476
477 #[tokio::test]
478 async fn test_scrollback_clear_functionality() {
479 let buffer = ScrollbackBuffer::new(100);
480
481 buffer.add_output(b"Test line 1\n").await;
483 buffer.add_output(b"Test line 2\n").await;
484
485 let lines_before = buffer.get_lines(0, 10).await;
487 assert_eq!(lines_before.len(), 2);
488 assert!(buffer.total_size().await > 0);
489
490 buffer.clear().await;
492
493 let lines_after = buffer.get_lines(0, 10).await;
495 assert_eq!(lines_after.len(), 0);
496 assert_eq!(buffer.total_size().await, 0);
497 assert_eq!(buffer.line_count().await, 0);
498 }
499}