use bytes::Bytes;
use guts_p2p::{Message, ObjectData, RefUpdate, RepoAnnounce, SyncRequest};
use guts_storage::{GitObject, ObjectId, ObjectStore, ObjectType};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub struct FailableObjectStore {
inner: ObjectStore,
read_failure_rate: AtomicUsize,
write_failure_rate: AtomicUsize,
fail_all: AtomicBool,
operation_count: AtomicUsize,
}
impl FailableObjectStore {
pub fn new() -> Self {
Self {
inner: ObjectStore::new(),
read_failure_rate: AtomicUsize::new(0),
write_failure_rate: AtomicUsize::new(0),
fail_all: AtomicBool::new(false),
operation_count: AtomicUsize::new(0),
}
}
pub fn set_read_failure_rate(&self, rate: usize) {
self.read_failure_rate
.store(rate.min(100), Ordering::SeqCst);
}
pub fn set_write_failure_rate(&self, rate: usize) {
self.write_failure_rate
.store(rate.min(100), Ordering::SeqCst);
}
pub fn set_fail_all(&self, fail: bool) {
self.fail_all.store(fail, Ordering::SeqCst);
}
pub fn operation_count(&self) -> usize {
self.operation_count.load(Ordering::SeqCst)
}
fn should_fail(&self, rate: usize) -> bool {
if self.fail_all.load(Ordering::SeqCst) {
return true;
}
if rate == 0 {
return false;
}
if rate >= 100 {
return true;
}
let op_num = self.operation_count.fetch_add(1, Ordering::SeqCst);
let pattern = (op_num * 97) % 100;
pattern < rate
}
pub fn put(&self, object: GitObject) -> Result<ObjectId, String> {
let rate = self.write_failure_rate.load(Ordering::SeqCst);
if self.should_fail(rate) {
return Err("Injected write failure".to_string());
}
Ok(self.inner.put(object))
}
pub fn get(&self, id: &ObjectId) -> Result<GitObject, String> {
let rate = self.read_failure_rate.load(Ordering::SeqCst);
if self.should_fail(rate) {
return Err("Injected read failure".to_string());
}
self.inner.get(id).map_err(|e| e.to_string())
}
pub fn inner(&self) -> &ObjectStore {
&self.inner
}
}
impl Default for FailableObjectStore {
fn default() -> Self {
Self::new()
}
}
#[test]
fn test_read_failure_handling() {
let store = FailableObjectStore::new();
let blob = GitObject::blob(b"test content".to_vec());
let id = store.put(blob).expect("First write should succeed");
let obj = store.get(&id).expect("First read should succeed");
assert_eq!(obj.data.as_ref(), b"test content");
store.set_read_failure_rate(50);
let mut successes = 0;
let mut failures = 0;
for _ in 0..100 {
match store.get(&id) {
Ok(_) => successes += 1,
Err(_) => failures += 1,
}
}
assert!(successes > 30, "Should have some successes");
assert!(failures > 30, "Should have some failures");
}
#[test]
fn test_write_failure_handling() {
let store = FailableObjectStore::new();
store.set_write_failure_rate(50);
let mut successes = 0;
let mut failures = 0;
for i in 0..100 {
let blob = GitObject::blob(format!("content {}", i).into_bytes());
match store.put(blob) {
Ok(_) => successes += 1,
Err(_) => failures += 1,
}
}
assert!(successes > 30, "Should have some successes");
assert!(failures > 30, "Should have some failures");
}
#[test]
fn test_complete_failure_mode() {
let store = FailableObjectStore::new();
let blob1 = GitObject::blob(b"content 1".to_vec());
let id1 = store
.put(blob1)
.expect("Should succeed before failure mode");
store.set_fail_all(true);
let blob2 = GitObject::blob(b"content 2".to_vec());
assert!(store.put(blob2).is_err(), "Write should fail");
assert!(store.get(&id1).is_err(), "Read should fail");
store.set_fail_all(false);
let obj = store
.get(&id1)
.expect("Should succeed after disabling failure mode");
assert_eq!(obj.data.as_ref(), b"content 1");
}
#[test]
fn test_intermittent_failure_recovery() {
let store = FailableObjectStore::new();
store.set_write_failure_rate(10);
let mut stored_ids = Vec::new();
for i in 0..50 {
let blob = GitObject::blob(format!("content {}", i).into_bytes());
let mut attempts = 0;
let max_attempts = 10;
loop {
attempts += 1;
match store.put(blob.clone()) {
Ok(id) => {
stored_ids.push(id);
break;
}
Err(_) if attempts < max_attempts => continue, Err(e) => {
panic!("Failed after {} attempts: {}", attempts, e);
}
}
}
}
store.set_read_failure_rate(0);
for id in &stored_ids {
assert!(store.get(id).is_ok(), "Object should be retrievable");
}
assert_eq!(
stored_ids.len(),
50,
"Should have stored all objects with retries"
);
}
#[test]
fn test_message_processing_with_storage_failures() {
let store = FailableObjectStore::new();
let objects: Vec<GitObject> = (0..10)
.map(|i| GitObject::blob(format!("object {}", i).into_bytes()))
.collect();
store.set_write_failure_rate(20);
let mut stored = 0;
let mut failed = 0;
for obj in &objects {
match store.put(obj.clone()) {
Ok(_) => stored += 1,
Err(_) => failed += 1,
}
}
println!("Stored {} objects, {} failures", stored, failed);
assert!(stored > 0, "Should have stored some objects");
}
#[test]
fn test_ref_update_with_failures() {
let updates: Vec<RefUpdate> = (0..20)
.map(|i| RefUpdate {
repo_key: format!("user/repo{}", i),
ref_name: "refs/heads/main".to_string(),
old_id: ObjectId::from_bytes([i as u8; 20]),
new_id: ObjectId::from_bytes([(i + 1) as u8; 20]),
})
.collect();
let mut processed = 0;
let mut encode_errors = 0;
let mut _decode_errors = 0;
for (idx, update) in updates.iter().enumerate() {
let encoded = update.encode();
let data = if idx % 5 == 4 {
encoded[..encoded.len().saturating_sub(5)].to_vec()
} else {
encoded.to_vec()
};
match Message::decode(&data) {
Ok(Message::RefUpdate(decoded)) => {
if decoded.repo_key == update.repo_key {
processed += 1;
} else {
_decode_errors += 1;
}
}
Ok(_) => _decode_errors += 1,
Err(_) => encode_errors += 1,
}
}
assert!(
processed >= 15,
"Most updates should process successfully, got {}",
processed
);
assert!(
encode_errors > 0,
"Should have some encoding errors from corruption"
);
}
#[test]
fn test_rapid_object_creation() {
let store = ObjectStore::new();
let start = std::time::Instant::now();
let object_count = 10000;
for i in 0..object_count {
let blob = GitObject::blob(format!("rapid object {}", i).into_bytes());
store.put(blob);
}
let duration = start.elapsed();
assert_eq!(store.len(), object_count, "All objects should be stored");
assert!(
duration.as_secs() < 5,
"Rapid creation took too long: {:?}",
duration
);
println!(
"Created {} objects in {:?} ({:.0} objects/sec)",
object_count,
duration,
object_count as f64 / duration.as_secs_f64()
);
}
#[test]
fn test_large_object_handling() {
let store = ObjectStore::new();
for size_mb in [1, 5, 10] {
let size = size_mb * 1024 * 1024;
let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
let blob = GitObject::blob(data);
let id = store.put(blob);
let retrieved = store.get(&id).expect("Should retrieve large object");
assert_eq!(retrieved.data.len(), size, "Size should match");
}
}
#[test]
fn test_concurrent_message_encoding() {
use std::thread;
let num_threads = 8;
let iterations_per_thread = 1000;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
thread::spawn(move || {
for i in 0..iterations_per_thread {
let msg = RepoAnnounce {
repo_key: format!("thread{}/repo{}", thread_id, i),
object_ids: (0..10)
.map(|j| ObjectId::from_bytes([(thread_id * 100 + j) as u8; 20]))
.collect(),
refs: vec![(
"refs/heads/main".to_string(),
ObjectId::from_bytes([thread_id as u8; 20]),
)],
};
let encoded = msg.encode();
let decoded = Message::decode(&encoded).expect("Should decode");
match decoded {
Message::RepoAnnounce(d) => {
assert_eq!(d.repo_key, msg.repo_key);
}
_ => panic!("Wrong message type"),
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should complete");
}
}
#[test]
fn test_partial_sync_processing() {
let store = FailableObjectStore::new();
let objects: Vec<GitObject> = (0..20)
.map(|i| GitObject::blob(format!("sync object {}", i).into_bytes()))
.collect();
let object_ids: Vec<ObjectId> = objects
.iter()
.map(|obj| {
store
.put(obj.clone())
.expect("Initial store should succeed")
})
.collect();
let sync_request = SyncRequest {
repo_key: "test/partial".to_string(),
want: object_ids.clone(),
};
let encoded = sync_request.encode();
let decoded = Message::decode(&encoded).expect("Should decode");
match decoded {
Message::SyncRequest(req) => {
store.set_read_failure_rate(50);
let mut retrieved = Vec::new();
let mut failed_ids = Vec::new();
for id in &req.want {
match store.get(id) {
Ok(obj) => retrieved.push(obj),
Err(_) => failed_ids.push(*id),
}
}
println!(
"Retrieved: {}, Failed: {}",
retrieved.len(),
failed_ids.len()
);
store.set_read_failure_rate(0);
for id in &failed_ids {
let obj = store.get(id).expect("Retry should succeed");
retrieved.push(obj);
}
assert_eq!(
retrieved.len(),
object_ids.len(),
"All objects should be retrieved after retry"
);
}
_ => panic!("Expected SyncRequest"),
}
}
#[test]
fn test_partial_object_data_validity() {
let objects = vec![
GitObject::blob(b"normal content".to_vec()),
GitObject::blob(Vec::new()), GitObject::blob(vec![0u8; 10000]), GitObject::new(ObjectType::Tree, Bytes::new()), GitObject::new(ObjectType::Commit, Bytes::from("tree abc\n")), ];
let msg = ObjectData {
repo_key: "test/partial-validity".to_string(),
objects: objects.clone(),
};
let encoded = msg.encode();
let decoded = Message::decode(&encoded).expect("Should decode");
match decoded {
Message::ObjectData(data) => {
assert_eq!(
data.objects.len(),
objects.len(),
"All objects should be preserved"
);
for (orig, decoded) in objects.iter().zip(data.objects.iter()) {
assert_eq!(orig.id, decoded.id, "Object ID should match");
assert_eq!(
orig.object_type, decoded.object_type,
"Object type should match"
);
assert_eq!(
orig.data.as_ref(),
decoded.data.as_ref(),
"Data should match"
);
}
}
_ => panic!("Expected ObjectData"),
}
}
#[test]
fn test_retry_with_backoff() {
let store = FailableObjectStore::new();
store.set_write_failure_rate(80);
let blob = GitObject::blob(b"retry test".to_vec());
let max_attempts = 10;
let mut attempt = 0;
let mut success = false;
while attempt < max_attempts && !success {
attempt += 1;
match store.put(blob.clone()) {
Ok(_) => {
success = true;
println!("Succeeded on attempt {}", attempt);
}
Err(_) => {
let _backoff_ms = 2_usize.pow(attempt as u32) * 10;
}
}
}
println!("Retry test: {} attempts, success={}", attempt, success);
}
#[test]
fn test_circuit_breaker_pattern() {
let store = FailableObjectStore::new();
let mut consecutive_failures = 0;
let failure_threshold = 3;
let mut circuit_open = false;
let mut operations_while_open = 0;
store.set_fail_all(true);
for i in 0..20 {
if circuit_open {
operations_while_open += 1;
if operations_while_open > 5 {
store.set_fail_all(false);
circuit_open = false;
consecutive_failures = 0;
}
continue;
}
let blob = GitObject::blob(format!("circuit {}", i).into_bytes());
match store.put(blob) {
Ok(_) => {
consecutive_failures = 0;
}
Err(_) => {
consecutive_failures += 1;
if consecutive_failures >= failure_threshold {
circuit_open = true;
}
}
}
}
assert!(
operations_while_open > 0,
"Circuit breaker should have opened"
);
}
#[test]
fn test_graceful_degradation() {
let store = FailableObjectStore::new();
let primary = GitObject::blob(b"primary data".to_vec());
let fallback = GitObject::blob(b"fallback data".to_vec());
let primary_id = store.put(primary).expect("Primary store should succeed");
let fallback_id = store.put(fallback).expect("Fallback store should succeed");
store.set_read_failure_rate(25);
let mut retrieved_count = 0;
let mut used_fallback = 0;
for _ in 0..100 {
match store.get(&primary_id) {
Ok(obj) => {
assert_eq!(obj.data.as_ref(), b"primary data");
retrieved_count += 1;
}
Err(_) => {
match store.get(&fallback_id) {
Ok(obj) => {
assert_eq!(obj.data.as_ref(), b"fallback data");
retrieved_count += 1;
used_fallback += 1;
}
Err(_) => {
}
}
}
}
}
println!(
"Graceful degradation: {} retrieved, {} used fallback",
retrieved_count, used_fallback
);
assert!(
retrieved_count > 50,
"Should retrieve data most of the time, got {}",
retrieved_count
);
}