1use serde::{Deserialize, Serialize};
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio::sync::{Mutex, mpsc};
12use tokio_util::sync::CancellationToken;
13
14#[derive(Serialize, Deserialize)]
16struct RecordEntry<T>
17where
18 T: Clone,
19{
20 timestamp: u64,
21 event: T,
22}
23
24#[derive(Debug)]
26pub struct Recorder<T> {
27 event_tx: mpsc::Sender<T>,
29 cancel: CancellationToken,
31 event_count: Arc<Mutex<usize>>,
33 first_event_time: Arc<Mutex<Option<Instant>>>,
35}
36
37impl<T> Recorder<T>
38where
39 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
40{
41 pub async fn new<P: AsRef<Path>>(
58 token: CancellationToken,
59 output_path: P,
60 max_lines_per_file: Option<usize>,
61 max_count: Option<usize>,
62 max_time: Option<f64>,
63 ) -> io::Result<Self> {
64 let (event_tx, mut event_rx) = mpsc::channel::<T>(2048);
65 let event_count = Arc::new(Mutex::new(0));
66 let event_count_clone = event_count.clone();
67 let cancel_clone = token.clone();
68 let start_time = Instant::now();
69 let first_event_time = Arc::new(Mutex::new(None));
70 let first_event_time_clone = first_event_time.clone();
71
72 if let Some(parent) = output_path.as_ref().parent()
74 && !parent.exists()
75 {
76 fs::create_dir_all(parent).await?;
77 }
78
79 let file = OpenOptions::new()
81 .create(true)
82 .write(true)
83 .truncate(true)
84 .open(&output_path)
85 .await?;
86
87 let file_path = output_path.as_ref().to_path_buf();
88
89 tokio::spawn(async move {
91 let start_time = start_time;
92 let mut writer = BufWriter::with_capacity(32768, file);
93 let mut line_count = 0;
94 let mut file_index = 0;
95 let base_path = file_path.clone();
96
97 let max_time_deadline = max_time.map(|secs| {
99 let duration = Duration::from_secs_f64(secs);
100 start_time + duration
101 });
102
103 loop {
104 if let Some(deadline) = max_time_deadline
106 && Instant::now() >= deadline
107 {
108 tracing::info!("Recorder reached max time limit, shutting down");
109 if let Err(e) = writer.flush().await {
111 tracing::error!("Failed to flush on time limit shutdown: {}", e);
112 }
113 cancel_clone.cancel();
114 return;
115 }
116
117 tokio::select! {
118 biased;
119
120 _ = cancel_clone.cancelled() => {
121 if let Err(e) = writer.flush().await {
123 tracing::error!("Failed to flush on shutdown: {}", e);
124 }
125
126 tracing::debug!("Recorder task shutting down");
127 return;
128 }
129
130 Some(event) = event_rx.recv() => {
131 {
133 let mut first_time = first_event_time_clone.lock().await;
134 if first_time.is_none() {
135 *first_time = Some(Instant::now());
136 }
137 }
138
139 let elapsed_ms = start_time.elapsed().as_millis() as u64;
141
142 let entry = RecordEntry {
144 timestamp: elapsed_ms,
145 event,
146 };
147
148 let json = match serde_json::to_string(&entry) {
150 Ok(json) => json,
151 Err(e) => {
152 tracing::error!("Failed to serialize event: {}", e);
153 continue;
154 }
155 };
156
157 if let Err(e) = writer.write_all(json.as_bytes()).await {
159 tracing::error!("Failed to write event: {}", e);
160 continue;
161 }
162
163 if let Err(e) = writer.write_all(b"\n").await {
165 tracing::error!("Failed to write newline: {}", e);
166 continue;
167 }
168
169 line_count += 1;
171
172 if let Some(max_lines) = max_lines_per_file
174 && line_count >= max_lines {
175 if let Err(e) = writer.flush().await {
177 tracing::error!("Failed to flush file before rotation: {}", e);
178 }
179
180 file_index += 1;
182 let new_path = create_rotated_path(&base_path, file_index);
183
184 match OpenOptions::new()
186 .create(true)
187 .write(true)
188 .truncate(true)
189 .open(&new_path)
190 .await
191 {
192 Ok(new_file) => {
193 writer = BufWriter::with_capacity(32768, new_file);
194 line_count = 0;
195 tracing::info!("Rotated to new file: {}", new_path.display());
196 },
197 Err(e) => {
198 tracing::error!("Failed to open rotated file {}: {}", new_path.display(), e);
199 }
201 }
202 }
203
204 let mut count = event_count_clone.lock().await;
206 *count += 1;
207
208 if let Some(max) = max_count
210 && *count >= max {
211 tracing::info!("Recorder reached max event count ({}), shutting down", max);
212 if let Err(e) = writer.flush().await {
214 tracing::error!("Failed to flush on count limit shutdown: {}", e);
215 }
216 drop(count);
218 cancel_clone.cancel();
219 return;
220 }
221 }
222 }
223 }
224 });
225
226 Ok(Self {
227 event_tx,
228 cancel: token,
229 event_count,
230 first_event_time,
231 })
232 }
233
234 pub fn event_sender(&self) -> mpsc::Sender<T> {
236 self.event_tx.clone()
237 }
238
239 pub async fn event_count(&self) -> usize {
241 *self.event_count.lock().await
242 }
243
244 pub async fn elapsed_time(&self) -> io::Result<Duration> {
248 let first_time = self.first_event_time.lock().await;
249 match *first_time {
250 Some(time) => Ok(time.elapsed()),
251 None => Err(io::Error::other("No events received yet")),
252 }
253 }
254
255 pub fn shutdown(&self) {
257 self.cancel.cancel();
258 }
259
260 pub async fn send_events<P: AsRef<Path>>(
275 filename: P,
276 event_tx: &mpsc::Sender<T>,
277 timed: bool,
278 max_count: Option<usize>,
279 max_time: Option<f64>,
280 ) -> io::Result<usize> {
281 let display_name = filename.as_ref().display().to_string();
283
284 if !filename.as_ref().exists() {
286 return Err(io::Error::new(
287 io::ErrorKind::NotFound,
288 format!("File not found: {}", display_name),
289 ));
290 }
291
292 let start_time = Instant::now();
294 let deadline = max_time.map(|secs| start_time + Duration::from_secs_f64(secs));
295
296 let file = File::open(&filename).await?;
298 let reader = BufReader::with_capacity(32768, file);
299 let mut lines = reader.lines();
300
301 let mut count = 0;
302 let mut line_number = 0;
303 let mut prev_timestamp: Option<u64> = None;
304
305 while let Some(line) = lines.next_line().await? {
307 if let Some(max) = max_count
309 && count >= max
310 {
311 tracing::info!("Reached maximum event count ({}), stopping", max);
312 break;
313 }
314
315 if let Some(end_time) = deadline
317 && Instant::now() >= end_time
318 {
319 tracing::info!("Reached maximum time limit, stopping");
320 break;
321 }
322
323 line_number += 1;
324
325 if line.trim().is_empty() {
327 continue;
328 }
329
330 let record: RecordEntry<T> = match serde_json::from_str(&line) {
332 Ok(record) => record,
333 Err(e) => {
334 tracing::warn!(
335 "Failed to parse JSON on line {}: {}. Skipping.",
336 line_number,
337 e
338 );
339 continue;
340 }
341 };
342
343 let timestamp = record.timestamp;
344 let event = record.event;
345
346 if timed
348 && let Some(prev) = prev_timestamp
349 && timestamp > prev
350 {
351 let wait_time = timestamp - prev;
352 tokio::time::sleep(Duration::from_millis(wait_time)).await;
353 }
354
355 event_tx
357 .send(event)
358 .await
359 .map_err(|e| io::Error::other(format!("Failed to send event: {e}")))?;
360
361 prev_timestamp = Some(timestamp);
363 count += 1;
364 }
365
366 if count == 0 {
367 tracing::warn!("No events to send from file: {}", display_name);
368 } else {
369 tracing::info!("Sent {} events from {}", count, display_name);
370 }
371
372 Ok(count)
373 }
374}
375
376impl<T> Drop for Recorder<T> {
377 fn drop(&mut self) {
378 self.cancel.cancel();
379 }
380}
381
382fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
384 let path_str = base_path.to_string_lossy();
385
386 if let Some(ext_pos) = path_str.rfind('.') {
387 let (file_path, extension) = path_str.split_at(ext_pos);
389 PathBuf::from(format!("{}{}{}", file_path, index, extension))
390 } else {
391 PathBuf::from(format!("{}{}", path_str, index))
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use std::time::Duration;
400 use tempfile::tempdir;
401
402 type TestEventRecorder = Recorder<TestEvent>;
404
405 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
407 struct TestEvent {
408 id: u64,
409 name: String,
410 values: Vec<i32>,
411 }
412
413 impl TestEvent {
414 fn new(id: u64) -> Self {
416 let num_values = rand::random_range(1..=100);
418
419 let values = (0..num_values)
421 .map(|_| rand::random_range(-100..=100))
422 .collect();
423
424 let name = format!("event_{}", id);
426
427 TestEvent { id, name, values }
428 }
429
430 fn generate_events(count: usize) -> Vec<Self> {
432 (0..count).map(|i| Self::new(i as u64)).collect()
433 }
434 }
435
436 #[tokio::test]
437 async fn test_recorder_streams_events_to_file() {
438 let dir = tempdir().unwrap();
440 let file_path = dir.path().join("events.jsonl");
441
442 let token = CancellationToken::new();
443 let recorder = TestEventRecorder::new(token.clone(), &file_path, None, None, None)
444 .await
445 .unwrap();
446 let event_tx = recorder.event_sender();
447
448 let events = TestEvent::generate_events(2);
450 let event1 = events[0].clone();
451 let event2 = events[1].clone();
452
453 tokio::time::sleep(Duration::from_millis(10)).await;
455
456 for event in &events {
458 event_tx.send(event.clone()).await.unwrap();
459 }
460
461 tokio::time::sleep(Duration::from_millis(10)).await;
463
464 assert_eq!(recorder.event_count().await, 2);
466
467 let elapsed_ms = recorder.elapsed_time().await.unwrap().as_millis();
469 if !(7..=13).contains(&elapsed_ms) {
470 println!("Actual elapsed time: {} ms", elapsed_ms);
471 assert!((7..=13).contains(&elapsed_ms));
472 }
473
474 recorder.shutdown();
476 tokio::time::sleep(Duration::from_millis(10)).await;
477
478 let content = fs::read_to_string(&file_path).await.unwrap();
480 let lines: Vec<&str> = content.lines().collect();
481
482 println!("JSONL file content:");
484 for (i, line) in lines.iter().enumerate() {
485 println!("Line {}: {}", i + 1, line);
486 }
487
488 assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
489
490 let entry1: RecordEntry<TestEvent> = serde_json::from_str(lines[0]).unwrap();
492 let entry2: RecordEntry<TestEvent> = serde_json::from_str(lines[1]).unwrap();
493
494 assert_eq!(entry1.event, event1);
495 assert_eq!(entry2.event, event2);
496 assert!(entry2.timestamp >= entry1.timestamp);
497 }
498
499 #[ignore]
500 #[tokio::test]
501 async fn load_test_100k_events() {
502 let dir = tempdir().unwrap();
504 let file_path = dir.path().join("events.jsonl");
505
506 let token = CancellationToken::new();
508
509 const MAX_LINES_PER_FILE: usize = 10_000;
511 let recorder = TestEventRecorder::new(
512 token.clone(),
513 &file_path,
514 Some(MAX_LINES_PER_FILE),
515 None,
516 None,
517 )
518 .await
519 .unwrap();
520 let event_tx = recorder.event_sender();
521
522 const NUM_EVENTS: usize = 100_000;
524 println!("Generating {} events...", NUM_EVENTS);
525
526 let events = TestEvent::generate_events(NUM_EVENTS);
528
529 for (i, event) in events.iter().enumerate() {
531 event_tx.send(event.clone()).await.unwrap();
532
533 if i > 0 && i % 10_000 == 0 {
535 println!("Sent {} events...", i);
536 }
537 }
538
539 println!("Waiting for events to be processed...");
541 tokio::time::sleep(Duration::from_millis(1000)).await;
542
543 let count = recorder.event_count().await;
545 println!("Recorded event count: {}", count);
546 assert_eq!(count, NUM_EVENTS);
547
548 recorder.shutdown();
550 tokio::time::sleep(Duration::from_millis(100)).await;
551
552 let base_file = file_path.clone();
554 let mut found_files = Vec::new();
555
556 if base_file.exists() {
558 found_files.push(base_file.clone());
559 }
560
561 for i in 1..=9 {
563 let rotated_path = create_rotated_path(&base_file, i);
564 if rotated_path.exists() {
565 found_files.push(rotated_path);
566 }
567 }
568
569 assert_eq!(
571 found_files.len(),
572 10,
573 "Expected 10 files due to rotation with 10k events each"
574 );
575
576 for (i, file_path) in found_files.iter().enumerate() {
578 let content = fs::read_to_string(file_path).await.unwrap();
579 let line_count = content.lines().count();
580
581 if i < found_files.len() - 1 {
582 assert_eq!(
584 line_count,
585 MAX_LINES_PER_FILE,
586 "File {} should contain exactly {} lines",
587 file_path.display(),
588 MAX_LINES_PER_FILE
589 );
590 } else {
591 assert!(
593 line_count <= MAX_LINES_PER_FILE,
594 "Last file should contain at most {} lines",
595 MAX_LINES_PER_FILE
596 );
597 }
598 }
599
600 let mut total_lines = 0;
602
603 for (i, file_path) in found_files.iter().enumerate() {
605 println!("Checking file {}: {}", i, file_path.display());
606
607 let content = fs::read_to_string(file_path).await.unwrap();
609 let line_count = content.lines().count();
610
611 if i < found_files.len() - 1 {
613 assert_eq!(
614 line_count, MAX_LINES_PER_FILE,
615 "Each file except possibly the last should have exactly MAX_LINES_PER_FILE lines"
616 );
617 }
618
619 total_lines += line_count;
620
621 let file = File::open(file_path).await.unwrap();
623 let reader = BufReader::new(file);
624 let mut lines = reader.lines();
625
626 let mut prev_timestamp: Option<u64> = None;
627 let mut line_number = 0;
628 let mut unsorted_count = 0;
629
630 while let Some(line) = lines.next_line().await.unwrap() {
632 line_number += 1;
633 let entry: RecordEntry<TestEvent> = serde_json::from_str(&line).unwrap();
634
635 if let Some(prev) = prev_timestamp
636 && entry.timestamp < prev
637 {
638 unsorted_count += 1;
639 if unsorted_count <= 5 {
640 println!(
642 "Timestamp order violation in file {} at line {}: {} < {}",
643 file_path.display(),
644 line_number,
645 entry.timestamp,
646 prev
647 );
648 }
649 }
650
651 prev_timestamp = Some(entry.timestamp);
652 }
653
654 assert_eq!(
655 unsorted_count, 0,
656 "Timestamps should be weakly sorted within each file"
657 );
658 }
659
660 assert_eq!(
661 total_lines, NUM_EVENTS,
662 "Total lines across all files should match NUM_EVENTS"
663 );
664
665 println!("Load test with file rotation completed successfully");
666 }
667}