use midstream::{
TemporalComparator, Sequence, ComparisonAlgorithm,
RealtimeScheduler, SchedulingPolicy, Priority,
Action, AgentContext, AgenticLoop, LeanAgenticConfig,
};
use std::collections::HashMap;
use std::time::Duration;
#[tokio::test]
async fn test_temporal_conversation_pattern_matching() {
let mut comparator = TemporalComparator::<String>::new();
comparator.add_sequence(Sequence {
data: vec![
"greeting".to_string(),
"weather_query".to_string(),
"location_query".to_string(),
"weather_response".to_string(),
],
timestamp: 1000,
id: "conv1".to_string(),
});
comparator.add_sequence(Sequence {
data: vec![
"greeting".to_string(),
"weather_query".to_string(),
"location_query".to_string(),
"weather_response".to_string(),
"followup".to_string(),
],
timestamp: 2000,
id: "conv2".to_string(),
});
comparator.add_sequence(Sequence {
data: vec![
"greeting".to_string(),
"calendar_query".to_string(),
"calendar_response".to_string(),
],
timestamp: 3000,
id: "conv3".to_string(),
});
let query = vec![
"greeting".to_string(),
"weather_query".to_string(),
"location_query".to_string(),
];
let similar = comparator.find_similar(&query, 0.7, ComparisonAlgorithm::LCS);
assert!(similar.len() >= 2);
println!("Found {} similar conversations", similar.len());
for (idx, score) in similar.iter() {
println!("Conversation {}: similarity = {}", idx, score);
assert!(*score >= 0.7);
}
}
#[tokio::test]
async fn test_temporal_action_sequence_analysis() {
let mut comparator = TemporalComparator::<String>::new();
let normal_sequence = vec![
"plan".to_string(),
"verify".to_string(),
"execute".to_string(),
"observe".to_string(),
"learn".to_string(),
];
let anomalous_sequence = vec![
"plan".to_string(),
"execute".to_string(),
"observe".to_string(),
"learn".to_string(),
];
let similarity = comparator.compare(
&normal_sequence,
&anomalous_sequence,
ComparisonAlgorithm::LCS,
);
println!("Similarity between normal and anomalous: {}", similarity);
assert!(similarity > 0.6);
assert!(similarity < 1.0);
let distance = comparator.compare(
&normal_sequence,
&anomalous_sequence,
ComparisonAlgorithm::EditDistance,
);
println!("Edit distance: {}", distance);
assert!(distance > 0.0); }
#[tokio::test]
async fn test_scheduler_with_deadlines() {
let scheduler = RealtimeScheduler::new(SchedulingPolicy::EarliestDeadlineFirst);
let critical_action = Action {
action_type: "critical_response".to_string(),
description: "User safety check".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: Some("safe".to_string()),
expected_reward: 1.0,
};
let critical_id = scheduler.schedule(
critical_action,
Priority::Critical,
Duration::from_millis(50), Duration::from_millis(10),
).await;
let normal_action = Action {
action_type: "normal_query".to_string(),
description: "Regular information request".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: None,
expected_reward: 0.7,
};
scheduler.schedule(
normal_action,
Priority::Medium,
Duration::from_secs(5), Duration::from_millis(100),
).await;
let next = scheduler.next_task().await.unwrap();
assert_eq!(next.id, critical_id);
assert_eq!(next.action.action_type, "critical_response");
println!("Scheduler correctly prioritized critical task with tight deadline");
}
#[tokio::test]
async fn test_scheduler_priority_override() {
let scheduler = RealtimeScheduler::new(SchedulingPolicy::FixedPriority);
scheduler.schedule(
Action {
action_type: "background_task".to_string(),
description: "Background processing".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: None,
expected_reward: 0.3,
},
Priority::Background,
Duration::from_secs(10),
Duration::from_millis(100),
).await;
scheduler.schedule(
Action {
action_type: "urgent_task".to_string(),
description: "Urgent response needed".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: None,
expected_reward: 0.9,
},
Priority::Critical,
Duration::from_secs(10),
Duration::from_millis(50),
).await;
let next = scheduler.next_task().await.unwrap();
assert_eq!(next.action.action_type, "urgent_task");
println!("Priority scheduling correctly prioritized critical task");
}
#[tokio::test]
async fn test_combined_temporal_and_scheduling() {
let mut comparator = TemporalComparator::<String>::new();
let scheduler = RealtimeScheduler::new(SchedulingPolicy::EarliestDeadlineFirst);
comparator.add_sequence(Sequence {
data: vec![
"user_query".to_string(),
"context_check".to_string(),
"knowledge_lookup".to_string(),
"response".to_string(),
],
timestamp: 1000,
id: "good_pattern".to_string(),
});
let current = vec!["user_query".to_string(), "context_check".to_string()];
let similar = comparator.find_similar(¤t, 0.5, ComparisonAlgorithm::LCS);
if !similar.is_empty() {
println!("Found similar successful pattern, scheduling with high priority");
scheduler.schedule(
Action {
action_type: "knowledge_lookup".to_string(),
description: "Predicted next action from pattern".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: Some("success".to_string()),
expected_reward: 0.85,
},
Priority::High, Duration::from_millis(100),
Duration::from_millis(20),
).await;
}
let stats = scheduler.get_stats().await;
assert_eq!(stats.total_scheduled, 1);
println!("Successfully combined temporal pattern matching with scheduling");
}
#[tokio::test]
async fn test_scheduler_deadline_checking() {
let scheduler = RealtimeScheduler::new(SchedulingPolicy::EarliestDeadlineFirst);
let can_meet = scheduler.can_meet_deadline(
Duration::from_millis(10),
Duration::from_secs(1),
).await;
assert!(can_meet);
for i in 0..50 {
scheduler.schedule(
Action {
action_type: format!("task_{}", i),
description: format!("Task {}", i),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: None,
expected_reward: 0.7,
},
Priority::Medium,
Duration::from_secs(10),
Duration::from_millis(50), ).await;
}
let can_meet_tight = scheduler.can_meet_deadline(
Duration::from_millis(10),
Duration::from_millis(100), ).await;
assert!(!can_meet_tight);
let can_meet_loose = scheduler.can_meet_deadline(
Duration::from_millis(10),
Duration::from_secs(10), ).await;
assert!(can_meet_loose);
println!("Deadline checking correctly estimates feasibility");
}
#[tokio::test]
async fn test_temporal_caching() {
let mut comparator = TemporalComparator::<i32>::new();
let seq1: Vec<i32> = (0..100).collect();
let seq2: Vec<i32> = (0..100).map(|x| x + 1).collect();
let result1 = comparator.compare(&seq1, &seq2, ComparisonAlgorithm::DTW);
let result2 = comparator.compare(&seq1, &seq2, ComparisonAlgorithm::DTW);
assert_eq!(result1, result2);
let stats = comparator.cache_stats();
println!("Cache stats: {:?}", stats);
assert_eq!(stats.dtw_count, 1);
let _result3 = comparator.compare(&seq1, &seq2, ComparisonAlgorithm::LCS);
let stats2 = comparator.cache_stats();
assert_eq!(stats2.lcs_count, 1);
assert_eq!(stats2.total_comparisons, 2);
println!("Caching working correctly: {} total comparisons", stats2.total_comparisons);
}
#[tokio::test]
async fn test_pattern_detection_in_stream() {
let comparator = TemporalComparator::<String>::new();
let intent_stream = vec![
"weather", "location", "weather", "news", "sports",
"weather", "location", "weather", "calendar", "weather",
"location", "weather",
].into_iter().map(|s| s.to_string()).collect::<Vec<_>>();
let pattern = vec!["weather".to_string(), "location".to_string(), "weather".to_string()];
let positions = comparator.detect_pattern(&intent_stream, &pattern);
println!("Found pattern at positions: {:?}", positions);
assert!(!positions.is_empty());
assert!(positions.contains(&0));
assert!(positions.contains(&9));
println!("Successfully detected {} pattern occurrences in stream", positions.len());
}
#[tokio::test]
async fn test_scheduler_stats_tracking() {
let scheduler = RealtimeScheduler::new(SchedulingPolicy::EarliestDeadlineFirst);
for i in 0..10 {
let task_id = scheduler.schedule(
Action {
action_type: format!("task_{}", i),
description: format!("Task {}", i),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: None,
expected_reward: 0.7,
},
Priority::Medium,
Duration::from_secs(1),
Duration::from_millis(10),
).await;
scheduler.mark_executed(task_id, Duration::from_micros(100 * (i + 1))).await;
}
let stats = scheduler.get_stats().await;
assert_eq!(stats.total_scheduled, 10);
assert_eq!(stats.total_executed, 10);
assert!(stats.average_latency_ns > 0);
assert!(stats.max_latency_ns >= stats.min_latency_ns);
println!("Scheduler stats: {:?}", stats);
println!("Average latency: {} μs", stats.average_latency_ns / 1000);
}
#[tokio::test]
async fn test_real_world_conversation_flow() {
let mut comparator = TemporalComparator::<String>::new();
let scheduler = RealtimeScheduler::new(SchedulingPolicy::EarliestDeadlineFirst);
comparator.add_sequence(Sequence {
data: vec![
"greeting".to_string(),
"clarification".to_string(),
"action".to_string(),
"confirmation".to_string(),
],
timestamp: 1000,
id: "success_pattern".to_string(),
});
let current_flow = vec!["greeting".to_string(), "clarification".to_string()];
let similar = comparator.find_similar(¤t_flow, 0.6, ComparisonAlgorithm::LCS);
if !similar.is_empty() {
scheduler.schedule(
Action {
action_type: "action".to_string(),
description: "Execute predicted action from pattern".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: Some("confirmation".to_string()),
expected_reward: 0.8,
},
Priority::High,
Duration::from_millis(200),
Duration::from_millis(50),
).await;
scheduler.schedule(
Action {
action_type: "confirmation".to_string(),
description: "Confirm action completion".to_string(),
parameters: HashMap::new(),
tool_calls: vec![],
expected_outcome: Some("success".to_string()),
expected_reward: 0.9,
},
Priority::Medium,
Duration::from_millis(500),
Duration::from_millis(30),
).await;
println!("Scheduled actions based on historical success pattern");
}
let mut executed_count = 0;
while let Some(task) = scheduler.next_task().await {
println!("Executing: {}", task.action.action_type);
scheduler.mark_executed(task.id, Duration::from_millis(10)).await;
executed_count += 1;
}
assert_eq!(executed_count, 2);
println!("Successfully completed conversation flow based on patterns");
}