use crate::kv_router::protocols::RouterEvent;
use crate::recorder::Recorder;
pub type KvRecorder = Recorder<RouterEvent>;
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_router::indexer::KvIndexer;
use crate::kv_router::indexer::KvIndexerMetrics;
use crate::kv_router::protocols::*;
use std::time::Duration;
use tempfile::tempdir;
use tokio::fs;
use tokio_util::sync::CancellationToken;
fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
hashes
.iter()
.map(|i| KvCacheStoredBlockData {
tokens_hash: LocalBlockHash(*i),
block_hash: ExternalSequenceBlockHash(*i * 100),
mm_extra_info: None,
})
.collect()
}
fn add_blocks(
hashes: Vec<u64>,
parent_hash: Option<ExternalSequenceBlockHash>,
) -> KvCacheEventData {
KvCacheEventData::Stored(KvCacheStoreData {
parent_hash,
blocks: make_blocks(hashes),
})
}
fn create_store_event(
worker_id: WorkerId,
event_id: u64,
hashes: Vec<u64>,
parent: Option<ExternalSequenceBlockHash>,
) -> RouterEvent {
RouterEvent::new(
worker_id,
KvCacheEvent {
event_id,
data: add_blocks(hashes, parent),
dp_rank: 0,
},
)
}
fn create_remove_event(worker_id: WorkerId, event_id: u64, hashes: Vec<u64>) -> RouterEvent {
RouterEvent::new(
worker_id,
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: hashes
.iter()
.map(|i| ExternalSequenceBlockHash(*i * 100))
.collect(),
}),
dp_rank: 0,
},
)
}
#[tokio::test]
async fn test_recorder_streams_events_to_file() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("kv_events.jsonl");
let token = CancellationToken::new();
let recorder = KvRecorder::new(token.clone(), &file_path, None, None, None)
.await
.unwrap();
let event_tx = recorder.event_sender();
let event1 = create_store_event(1, 42, vec![1, 2, 3], None);
let event2 = create_remove_event(1, 43, vec![2, 3]);
event_tx.send(event1).await.unwrap();
event_tx.send(event2).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(recorder.event_count().await, 2);
recorder.shutdown();
tokio::time::sleep(Duration::from_millis(10)).await;
let content = fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
println!("JSONL file content:");
for (i, line) in lines.iter().enumerate() {
println!("Line {}: {}", i + 1, line);
}
assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
let indexer_token = CancellationToken::new();
let kv_block_size = 32; let kv_indexer_metrics = KvIndexerMetrics::new_unregistered();
let indexer = KvIndexer::new(
indexer_token.clone(),
kv_block_size,
kv_indexer_metrics.into(),
);
let indexer_event_tx = indexer.event_sender();
let count = KvRecorder::send_events(&file_path, &indexer_event_tx, false, None, None)
.await
.unwrap();
assert_eq!(count, 2, "Expected to send 2 events from file to indexer");
}
}