use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use thread_flow::incremental::{
graph::{DependencyGraph, GraphError},
storage::{InMemoryStorage, StorageBackend, StorageError},
types::{AnalysisDefFingerprint, DependencyEdge, DependencyType},
};
#[derive(Debug, Clone, Copy, PartialEq)]
enum CorruptionMode {
None,
CorruptFingerprint,
InvalidGraph,
PartialWrite,
}
#[derive(Debug, Clone)]
struct ErrorConfig {
fail_on_save: bool,
fail_on_load: bool,
fail_on_transaction: bool,
corruption_mode: CorruptionMode,
fail_after_ops: usize,
simulate_conflict: bool,
}
impl Default for ErrorConfig {
fn default() -> Self {
Self {
fail_on_save: false,
fail_on_load: false,
fail_on_transaction: false,
corruption_mode: CorruptionMode::None,
fail_after_ops: 0,
simulate_conflict: false,
}
}
}
#[derive(Debug)]
struct FailingStorage {
inner: InMemoryStorage,
config: Arc<ErrorConfig>,
op_counter: AtomicUsize,
corrupted: AtomicBool,
}
impl FailingStorage {
fn new(config: ErrorConfig) -> Self {
Self {
inner: InMemoryStorage::new(),
config: Arc::new(config),
op_counter: AtomicUsize::new(0),
corrupted: AtomicBool::new(false),
}
}
fn new_failing_save() -> Self {
Self::new(ErrorConfig {
fail_on_save: true,
..Default::default()
})
}
fn new_failing_load() -> Self {
Self::new(ErrorConfig {
fail_on_load: true,
..Default::default()
})
}
fn new_corrupted_fingerprint() -> Self {
Self::new(ErrorConfig {
corruption_mode: CorruptionMode::CorruptFingerprint,
..Default::default()
})
}
fn new_invalid_graph() -> Self {
Self::new(ErrorConfig {
corruption_mode: CorruptionMode::InvalidGraph,
..Default::default()
})
}
fn new_partial_write() -> Self {
Self::new(ErrorConfig {
corruption_mode: CorruptionMode::PartialWrite,
..Default::default()
})
}
fn new_conflict() -> Self {
Self::new(ErrorConfig {
simulate_conflict: true,
..Default::default()
})
}
fn new_fail_after(ops: usize) -> Self {
Self::new(ErrorConfig {
fail_after_ops: ops,
..Default::default()
})
}
fn should_fail(&self) -> bool {
let count = self.op_counter.fetch_add(1, Ordering::SeqCst);
if self.config.fail_after_ops > 0 && count >= self.config.fail_after_ops {
return true;
}
false
}
fn mark_corrupted(&self) {
self.corrupted.store(true, Ordering::SeqCst);
}
fn is_corrupted(&self) -> bool {
self.corrupted.load(Ordering::SeqCst)
}
}
#[async_trait]
impl StorageBackend for FailingStorage {
async fn save_fingerprint(
&self,
file_path: &Path,
fingerprint: &AnalysisDefFingerprint,
) -> Result<(), StorageError> {
if self.config.fail_on_save || self.should_fail() {
return Err(StorageError::Backend("Simulated save failure".to_string()));
}
if self.config.corruption_mode == CorruptionMode::PartialWrite {
self.mark_corrupted();
return Err(StorageError::Backend("Partial write detected".to_string()));
}
self.inner.save_fingerprint(file_path, fingerprint).await
}
async fn load_fingerprint(
&self,
file_path: &Path,
) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
if self.config.fail_on_load || self.should_fail() {
return Err(StorageError::Backend("Simulated load failure".to_string()));
}
if self.config.corruption_mode == CorruptionMode::CorruptFingerprint {
return Err(StorageError::Corruption(format!(
"Corrupted fingerprint data for {}",
file_path.display()
)));
}
if self.is_corrupted() {
return Err(StorageError::Corruption(
"Storage in corrupted state".to_string(),
));
}
self.inner.load_fingerprint(file_path).await
}
async fn delete_fingerprint(&self, file_path: &Path) -> Result<bool, StorageError> {
if self.should_fail() {
return Err(StorageError::Backend(
"Simulated delete failure".to_string(),
));
}
self.inner.delete_fingerprint(file_path).await
}
async fn save_edge(&self, edge: &DependencyEdge) -> Result<(), StorageError> {
if self.config.fail_on_save || self.should_fail() {
return Err(StorageError::Backend(
"Simulated edge save failure".to_string(),
));
}
if self.config.simulate_conflict {
return Err(StorageError::Backend(
"Concurrent access conflict".to_string(),
));
}
self.inner.save_edge(edge).await
}
async fn load_edges_from(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
if self.config.fail_on_load || self.should_fail() {
return Err(StorageError::Backend(
"Simulated edges load failure".to_string(),
));
}
self.inner.load_edges_from(file_path).await
}
async fn load_edges_to(&self, file_path: &Path) -> Result<Vec<DependencyEdge>, StorageError> {
if self.config.fail_on_load || self.should_fail() {
return Err(StorageError::Backend(
"Simulated edges load failure".to_string(),
));
}
self.inner.load_edges_to(file_path).await
}
async fn delete_edges_for(&self, file_path: &Path) -> Result<usize, StorageError> {
if self.should_fail() {
return Err(StorageError::Backend(
"Simulated edges delete failure".to_string(),
));
}
self.inner.delete_edges_for(file_path).await
}
async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
if self.config.fail_on_load || self.should_fail() {
return Err(StorageError::Backend(
"Simulated graph load failure".to_string(),
));
}
if self.config.corruption_mode == CorruptionMode::InvalidGraph {
let mut graph = DependencyGraph::new();
graph.edges.push(DependencyEdge::new(
PathBuf::from("nonexistent.rs"),
PathBuf::from("also_nonexistent.rs"),
DependencyType::Import,
));
return Ok(graph);
}
self.inner.load_full_graph().await
}
async fn save_full_graph(&self, graph: &DependencyGraph) -> Result<(), StorageError> {
if self.config.fail_on_save || self.should_fail() {
return Err(StorageError::Backend(
"Simulated graph save failure".to_string(),
));
}
if self.config.fail_on_transaction {
return Err(StorageError::Backend(
"Transaction failed to start".to_string(),
));
}
self.inner.save_full_graph(graph).await
}
fn name(&self) -> &'static str {
"failing_storage"
}
}
#[tokio::test]
async fn test_storage_corrupted_fingerprint_recovery() {
let storage = FailingStorage::new_corrupted_fingerprint();
let result = storage.load_fingerprint(Path::new("test.rs")).await;
assert!(result.is_err());
match result.unwrap_err() {
StorageError::Corruption(msg) => {
assert!(msg.contains("Corrupted fingerprint"));
}
_ => panic!("Expected Corruption error"),
}
}
#[tokio::test]
async fn test_storage_invalid_graph_structure() {
let storage = FailingStorage::new_invalid_graph();
let graph = storage.load_full_graph().await;
assert!(graph.is_ok());
let graph = graph.unwrap();
let validation = graph.validate();
assert!(validation.is_err());
}
#[tokio::test]
async fn test_storage_connection_failure() {
let storage = FailingStorage::new_failing_load();
let result = storage.load_fingerprint(Path::new("test.rs")).await;
assert!(result.is_err());
match result.unwrap_err() {
StorageError::Backend(msg) => {
assert!(msg.contains("Simulated load failure"));
}
_ => panic!("Expected Backend error"),
}
}
#[tokio::test]
async fn test_storage_write_failure() {
let storage = FailingStorage::new_failing_save();
let fp = AnalysisDefFingerprint::new(b"test");
let result = storage.save_fingerprint(Path::new("test.rs"), &fp).await;
assert!(result.is_err());
match result.unwrap_err() {
StorageError::Backend(msg) => {
assert!(msg.contains("Simulated save failure"));
}
_ => panic!("Expected Backend error"),
}
}
#[tokio::test]
async fn test_storage_transaction_rollback() {
let storage = FailingStorage::new(ErrorConfig {
fail_on_transaction: true,
..Default::default()
});
let graph = DependencyGraph::new();
let result = storage.save_full_graph(&graph).await;
assert!(result.is_err());
match result.unwrap_err() {
StorageError::Backend(msg) => {
assert!(msg.contains("Transaction failed"));
}
_ => panic!("Expected Backend error"),
}
}
#[tokio::test]
async fn test_storage_concurrent_access_conflict() {
let storage = FailingStorage::new_conflict();
let edge = DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Import,
);
let result = storage.save_edge(&edge).await;
assert!(result.is_err());
match result.unwrap_err() {
StorageError::Backend(msg) => {
assert!(msg.contains("Concurrent access conflict"));
}
_ => panic!("Expected Backend error"),
}
}
#[tokio::test]
async fn test_storage_state_recovery_after_error() {
let storage = FailingStorage::new_partial_write();
let fp = AnalysisDefFingerprint::new(b"data");
let result = storage.save_fingerprint(Path::new("test.rs"), &fp).await;
assert!(result.is_err());
let load_result = storage.load_fingerprint(Path::new("test.rs")).await;
assert!(load_result.is_err());
match load_result.unwrap_err() {
StorageError::Corruption(msg) => {
assert!(msg.contains("corrupted state"));
}
_ => panic!("Expected Corruption error"),
}
}
#[tokio::test]
async fn test_storage_fallback_to_inmemory() {
let failing = FailingStorage::new_failing_load();
let result = failing.load_full_graph().await;
assert!(result.is_err());
let fallback = InMemoryStorage::new();
let graph = fallback.load_full_graph().await;
assert!(graph.is_ok());
let fp = AnalysisDefFingerprint::new(b"test");
fallback
.save_fingerprint(Path::new("test.rs"), &fp)
.await
.unwrap();
let loaded = fallback
.load_fingerprint(Path::new("test.rs"))
.await
.unwrap();
assert!(loaded.is_some());
}
#[tokio::test]
async fn test_storage_full_reanalysis_trigger() {
let storage = FailingStorage::new(ErrorConfig {
corruption_mode: CorruptionMode::InvalidGraph,
..Default::default()
});
let graph = storage.load_full_graph().await.unwrap();
assert!(graph.validate().is_err());
let mut fresh_graph = DependencyGraph::new();
fresh_graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Import,
));
assert!(fresh_graph.validate().is_ok());
}
#[tokio::test]
async fn test_storage_data_validation_on_load() {
let storage = InMemoryStorage::new();
let fp = AnalysisDefFingerprint::new(b"valid data");
storage
.save_fingerprint(Path::new("test.rs"), &fp)
.await
.unwrap();
let loaded = storage
.load_fingerprint(Path::new("test.rs"))
.await
.unwrap();
assert!(loaded.is_some());
let loaded_fp = loaded.unwrap();
assert!(loaded_fp.content_matches(b"valid data"));
}
#[tokio::test]
async fn test_graph_circular_dependency_detection() {
let mut graph = DependencyGraph::new();
graph.add_edge(DependencyEdge::new(
PathBuf::from("A"),
PathBuf::from("B"),
DependencyType::Import,
));
graph.add_edge(DependencyEdge::new(
PathBuf::from("B"),
PathBuf::from("C"),
DependencyType::Import,
));
graph.add_edge(DependencyEdge::new(
PathBuf::from("C"),
PathBuf::from("A"),
DependencyType::Import,
));
let files = vec![PathBuf::from("A"), PathBuf::from("B"), PathBuf::from("C")]
.into_iter()
.collect();
let result = graph.topological_sort(&files);
assert!(result.is_err());
match result.unwrap_err() {
GraphError::CyclicDependency(path) => {
let p = path.as_path();
assert!(
p == std::path::Path::new("A")
|| p == std::path::Path::new("B")
|| p == std::path::Path::new("C")
);
}
}
}
#[tokio::test]
async fn test_graph_invalid_node_references() {
let mut graph = DependencyGraph::new();
graph.edges.push(DependencyEdge::new(
PathBuf::from("ghost.rs"),
PathBuf::from("phantom.rs"),
DependencyType::Import,
));
let result = graph.validate();
assert!(result.is_err());
graph.edges.clear();
assert!(graph.validate().is_ok());
}
#[tokio::test]
async fn test_graph_orphaned_edges() {
let mut graph = DependencyGraph::new();
graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Import,
));
graph.nodes.clear();
assert!(graph.validate().is_err());
graph.edges.clear();
assert!(graph.validate().is_ok());
}
#[tokio::test]
async fn test_graph_type_mismatches() {
let mut graph = DependencyGraph::new();
graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Import,
));
graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Export,
));
assert_eq!(graph.edge_count(), 2);
}
#[tokio::test]
async fn test_graph_corruption_recovery() {
let storage = FailingStorage::new_invalid_graph();
let corrupted = storage.load_full_graph().await.unwrap();
assert!(corrupted.validate().is_err());
let mut recovered = DependencyGraph::new();
recovered.add_edge(DependencyEdge::new(
PathBuf::from("valid.rs"),
PathBuf::from("dep.rs"),
DependencyType::Import,
));
assert!(recovered.validate().is_ok());
assert_eq!(recovered.node_count(), 2);
}
#[tokio::test]
async fn test_graph_consistency_validation() {
let mut graph = DependencyGraph::new();
graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
DependencyType::Import,
));
graph.add_edge(DependencyEdge::new(
PathBuf::from("b.rs"),
PathBuf::from("c.rs"),
DependencyType::Import,
));
assert!(graph.validate().is_ok());
graph.edges.push(DependencyEdge::new(
PathBuf::from("invalid.rs"),
PathBuf::from("missing.rs"),
DependencyType::Import,
));
assert!(graph.validate().is_err());
graph.edges.pop();
assert!(graph.validate().is_ok());
}
#[tokio::test]
async fn test_concurrency_thread_panic_recovery() {
use std::panic;
let result = panic::catch_unwind(|| {
panic!("Simulated thread panic");
});
assert!(result.is_err());
}
#[tokio::test]
async fn test_concurrency_task_cancellation() {
use tokio::time::{Duration, sleep, timeout};
let task = tokio::spawn(async {
sleep(Duration::from_secs(10)).await;
"completed"
});
let result = timeout(Duration::from_millis(100), task).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_concurrency_tokio_runtime_failure() {
let mut handles = vec![];
for _ in 0..100 {
handles.push(tokio::spawn(async { Ok::<(), String>(()) }));
}
for handle in handles {
handle.await.unwrap().unwrap();
}
}
#[cfg(feature = "parallel")]
#[tokio::test]
async fn test_concurrency_rayon_panic_handling() {
use rayon::prelude::*;
let items: Vec<i32> = vec![1, 2, 3, 4, 5];
let result = std::panic::catch_unwind(|| {
items
.par_iter()
.map(|&x| {
if x == 3 {
panic!("Simulated panic at 3");
}
x * 2
})
.collect::<Vec<_>>()
});
assert!(result.is_err());
}
#[tokio::test]
async fn test_concurrency_deadlock_prevention() {
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, timeout};
let lock1 = Arc::new(Mutex::new(1));
let lock2 = Arc::new(Mutex::new(2));
let lock1_clone = Arc::clone(&lock1);
let lock2_clone = Arc::clone(&lock2);
let task1 = tokio::spawn(async move {
let g1 = lock1_clone.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let lock2_result = timeout(Duration::from_millis(100), lock2_clone.lock()).await;
drop(g1); lock2_result.is_ok() || lock2_result.is_err()
});
let lock1_clone2 = Arc::clone(&lock1);
let lock2_clone2 = Arc::clone(&lock2);
let task2 = tokio::spawn(async move {
let g2 = lock2_clone2.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let lock1_result = timeout(Duration::from_millis(100), lock1_clone2.lock()).await;
drop(g2); lock1_result.is_ok() || lock1_result.is_err()
});
let result1 = task1.await;
let result2 = task2.await;
assert!(result1.is_ok());
assert!(result2.is_ok());
assert!(result1.unwrap()); assert!(result2.unwrap());
}
#[tokio::test]
async fn test_analysis_parser_failure() {
let _invalid_rust = "fn broken { incomplete syntax )))";
}
#[tokio::test]
async fn test_analysis_out_of_memory_simulation() {
let large_graph = DependencyGraph::new();
assert!(large_graph.node_count() < 1_000_000);
}
#[tokio::test]
async fn test_analysis_timeout_handling() {
use tokio::time::{Duration, sleep, timeout};
let slow_analysis = async {
sleep(Duration::from_secs(10)).await;
Ok::<(), String>(())
};
let result = timeout(Duration::from_millis(100), slow_analysis).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_analysis_invalid_utf8_recovery() {
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
#[cfg(unix)]
{
let invalid_bytes = &[0xFF, 0xFE, 0xFD];
let invalid_path = PathBuf::from(OsStr::from_bytes(invalid_bytes));
assert!(invalid_path.to_str().is_none());
}
}
#[tokio::test]
async fn test_analysis_large_file_handling() {
let large_content = "fn test() {}\n".repeat(10_000);
assert!(large_content.len() > 100_000);
}
#[tokio::test]
async fn test_analysis_resource_exhaustion() {
let storage = FailingStorage::new_fail_after(5);
for i in 0..10 {
let fp = AnalysisDefFingerprint::new(b"test");
let result = storage
.save_fingerprint(&PathBuf::from(format!("file{}.rs", i)), &fp)
.await;
if i < 5 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
}
#[tokio::test]
async fn test_full_error_recovery_workflow() {
let primary = FailingStorage::new_failing_load();
let load_result = primary.load_full_graph().await;
assert!(load_result.is_err());
let fallback = InMemoryStorage::new();
let graph = fallback.load_full_graph().await;
assert!(graph.is_ok());
let fp = AnalysisDefFingerprint::new(b"content");
fallback
.save_fingerprint(Path::new("test.rs"), &fp)
.await
.unwrap();
let loaded = fallback
.load_fingerprint(Path::new("test.rs"))
.await
.unwrap();
assert!(loaded.is_some());
}
#[tokio::test]
async fn test_error_recovery_test_count() {
let storage_tests = 10; let graph_tests = 6; let concurrency_tests = 5; let analysis_tests = 6;
let total = storage_tests + graph_tests + concurrency_tests + analysis_tests;
assert_eq!(
total, 27,
"Error recovery test suite should have exactly 27 tests"
);
}