mod common;
use common::{ContextBuilder, create_success_step, setup_memory_with_config};
use do_memory_core::{
BatchConfig, ExecutionStep, MemoryConfig, TaskContext, TaskOutcome, TaskType,
};
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[tokio::test]
async fn test_step_buffering_with_auto_flush_on_size() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 10,
flush_interval_ms: 60000, auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("step-batching-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test size-based auto-flush".to_string(),
context,
TaskType::Testing,
)
.await;
for i in 1..=9 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
0,
"Steps should still be buffered, not persisted"
);
let step = create_success_step(10, "test_tool", "Action 10");
memory.log_step(episode_id, step).await;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
10,
"All 10 steps should be flushed and persisted"
);
for i in 0..10 {
assert_eq!(
episode.steps[i].step_number,
i + 1,
"Step ordering should be preserved"
);
}
Ok(())
}
#[tokio::test]
async fn test_step_buffering_with_auto_flush_on_time() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 1000, flush_interval_ms: 100, auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("time-flush-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test time-based auto-flush".to_string(),
context,
TaskType::Testing,
)
.await;
for i in 1..=3 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
0,
"Steps should still be buffered initially"
);
sleep(Duration::from_millis(150)).await;
let step = create_success_step(4, "test_tool", "Action 4");
memory.log_step(episode_id, step).await;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
4,
"All steps should be flushed due to time threshold"
);
Ok(())
}
#[tokio::test]
async fn test_manual_flush() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 100,
flush_interval_ms: 60000,
auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("manual-flush-test")
.language("rust")
.build();
let episode_id = memory
.start_episode("Test manual flush".to_string(), context, TaskType::Testing)
.await;
for i in 1..=5 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
let episode = memory.get_episode(episode_id).await?;
assert_eq!(episode.steps.len(), 0, "Steps should still be buffered");
memory.flush_steps(episode_id).await?;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(episode.steps.len(), 5, "All steps should be flushed");
let step = create_success_step(6, "test_tool", "Action 6");
memory.log_step(episode_id, step).await;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(episode.steps.len(), 5, "New step should be in fresh buffer");
memory.flush_steps(episode_id).await?;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
6,
"Second flush should persist new step"
);
Ok(())
}
#[tokio::test]
async fn test_complete_episode_flushes_steps() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 100,
flush_interval_ms: 60000,
auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("complete-flush-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test completion flushes steps".to_string(),
context,
TaskType::Testing,
)
.await;
for i in 1..=8 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
let episode = memory.get_episode(episode_id).await?;
assert_eq!(episode.steps.len(), 0, "Steps should be buffered");
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Task completed".to_string(),
artifacts: vec![],
},
)
.await?;
let completed_episode = memory.get_episode(episode_id).await?;
assert!(
completed_episode.is_complete(),
"Episode should be complete"
);
assert_eq!(
completed_episode.steps.len(),
8,
"All buffered steps should be flushed on completion"
);
assert!(
completed_episode.reward.is_some(),
"Completed episode should have reward"
);
assert!(
completed_episode.reflection.is_some(),
"Completed episode should have reflection"
);
Ok(())
}
#[tokio::test]
async fn test_batching_disabled() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: None, quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("no-batch-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test immediate persistence".to_string(),
context,
TaskType::Testing,
)
.await;
for i in 1..=5 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
i,
"Step {i} should be immediately persisted"
);
}
Ok(())
}
#[tokio::test]
async fn test_multiple_episodes_concurrent_buffering() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 20,
flush_interval_ms: 5000,
auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let mut episode_ids = Vec::new();
for i in 0..3 {
let context = ContextBuilder::new(format!("concurrent-test-{i}"))
.language("rust")
.build();
let episode_id = memory
.start_episode(format!("Concurrent task {i}"), context, TaskType::Testing)
.await;
episode_ids.push(episode_id);
}
for (idx, episode_id) in episode_ids.iter().enumerate() {
let step_count = (idx + 1) * 3; for step_num in 1..=step_count {
let step = create_success_step(
step_num,
"test_tool",
&format!("Episode {idx} Step {step_num}"),
);
memory.log_step(*episode_id, step).await;
}
}
for episode_id in &episode_ids {
let episode = memory.get_episode(*episode_id).await?;
assert_eq!(
episode.steps.len(),
0,
"Steps should be buffered for episode {episode_id}"
);
}
for episode_id in &episode_ids {
memory.flush_steps(*episode_id).await?;
}
for (idx, episode_id) in episode_ids.iter().enumerate() {
let episode = memory.get_episode(*episode_id).await?;
let expected_steps = (idx + 1) * 3;
assert_eq!(
episode.steps.len(),
expected_steps,
"Episode {} should have {} steps (got {})",
idx,
expected_steps,
episode.steps.len()
);
for (step_idx, step) in episode.steps.iter().enumerate() {
assert_eq!(
step.step_number,
step_idx + 1,
"Step ordering should be preserved"
);
}
}
Ok(())
}
#[tokio::test]
#[allow(clippy::cast_precision_loss)]
async fn test_batching_performance_improvement() -> anyhow::Result<()> {
let step_count = 100;
let config_no_batch = MemoryConfig {
batch_config: None,
quality_threshold: 0.0, ..Default::default()
};
let memory_no_batch = setup_memory_with_config(config_no_batch);
let context1 = ContextBuilder::new("perf-no-batch")
.language("rust")
.build();
let episode_id1 = memory_no_batch
.start_episode(
"Performance test - no batching".to_string(),
context1,
TaskType::Testing,
)
.await;
let start_no_batch = Instant::now();
for i in 1..=step_count {
let step = ExecutionStep::new(i, "tool".to_string(), format!("Action {i}"));
memory_no_batch.log_step(episode_id1, step).await;
}
let duration_no_batch = start_no_batch.elapsed();
let config_batch = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 50,
flush_interval_ms: 60000,
auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory_batch = setup_memory_with_config(config_batch);
let context2 = ContextBuilder::new("perf-batch").language("rust").build();
let episode_id2 = memory_batch
.start_episode(
"Performance test - with batching".to_string(),
context2,
TaskType::Testing,
)
.await;
let start_batch = Instant::now();
for i in 1..=step_count {
let step = ExecutionStep::new(i, "tool".to_string(), format!("Action {i}"));
memory_batch.log_step(episode_id2, step).await;
}
memory_batch.flush_steps(episode_id2).await?;
let duration_batch = start_batch.elapsed();
let episode1 = memory_no_batch.get_episode(episode_id1).await?;
let episode2 = memory_batch.get_episode(episode_id2).await?;
assert_eq!(
episode1.steps.len(),
step_count,
"No-batch should have all steps"
);
assert_eq!(
episode2.steps.len(),
step_count,
"Batch should have all steps"
);
println!("\n=== Performance Comparison ===");
println!("Without batching: {duration_no_batch:?} ({step_count} steps)");
println!("With batching: {duration_batch:?} ({step_count} steps)");
println!(
"Speedup: {:.2}x",
duration_no_batch.as_micros() as f64 / duration_batch.as_micros() as f64
);
Ok(())
}
#[tokio::test]
async fn test_no_data_loss_on_flush() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 5,
flush_interval_ms: 60000,
auto_flush: true,
}),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("data-integrity-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test data integrity".to_string(),
context,
TaskType::Testing,
)
.await;
let total_steps = 27; for i in 1..=total_steps {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
memory.flush_steps(episode_id).await?;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
total_steps,
"All steps should be preserved across multiple flushes"
);
for (idx, step) in episode.steps.iter().enumerate() {
assert_eq!(
step.step_number,
idx + 1,
"Step numbering should be sequential"
);
assert_eq!(
step.action,
format!("Action {}", idx + 1),
"Step content should match"
);
}
Ok(())
}
#[tokio::test]
async fn test_flush_empty_buffer() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig::default()),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = TaskContext::default();
let episode_id = memory
.start_episode("Test empty flush".to_string(), context, TaskType::Testing)
.await;
memory.flush_steps(episode_id).await?;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(episode.steps.len(), 0, "Should have no steps");
Ok(())
}
#[tokio::test]
async fn test_flush_nonexistent_episode() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig::default()),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let fake_id = uuid::Uuid::new_v4();
let result = memory.flush_steps(fake_id).await;
assert!(
result.is_ok(),
"Flushing non-existent episode should not error"
);
Ok(())
}
#[tokio::test]
async fn test_manual_flush_only_mode() -> anyhow::Result<()> {
let config = MemoryConfig {
batch_config: Some(BatchConfig {
max_batch_size: 10,
flush_interval_ms: 1000,
auto_flush: false, }),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = ContextBuilder::new("manual-only-test")
.language("rust")
.build();
let episode_id = memory
.start_episode(
"Test manual-only mode".to_string(),
context,
TaskType::Testing,
)
.await;
for i in 1..=15 {
let step = create_success_step(i, "test_tool", &format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
sleep(Duration::from_millis(1200)).await;
let step = create_success_step(16, "test_tool", "Action 16");
memory.log_step(episode_id, step).await;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
0,
"With auto_flush=false, no steps should be auto-flushed"
);
memory.flush_steps(episode_id).await?;
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
16,
"Manual flush should persist all buffered steps"
);
Ok(())
}
#[tokio::test]
async fn test_batch_config_presets() -> anyhow::Result<()> {
let high_freq = BatchConfig::high_frequency();
assert_eq!(high_freq.max_batch_size, 20);
assert_eq!(high_freq.flush_interval_ms, 2000);
assert!(high_freq.auto_flush);
let low_freq = BatchConfig::low_frequency();
assert_eq!(low_freq.max_batch_size, 100);
assert_eq!(low_freq.flush_interval_ms, 10000);
assert!(low_freq.auto_flush);
let manual = BatchConfig::manual_only();
assert_eq!(manual.max_batch_size, usize::MAX);
assert!(!manual.auto_flush);
let config = MemoryConfig {
batch_config: Some(BatchConfig::high_frequency()),
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
let context = TaskContext::default();
let episode_id = memory
.start_episode("Test preset".to_string(), context, TaskType::Testing)
.await;
for i in 1..=20 {
let step = ExecutionStep::new(i, "tool".to_string(), format!("Action {i}"));
memory.log_step(episode_id, step).await;
}
let episode = memory.get_episode(episode_id).await?;
assert_eq!(
episode.steps.len(),
20,
"High frequency preset should auto-flush at 20 steps"
);
Ok(())
}