mlg 0.1.0

An asynchronous Log for Rust
Documentation
use std::sync::Arc;

use tempfile::tempfile;

use mlg::{FileLog, Log, Offset};
use mlg::LogError::IndexOutOfBounds;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct PersonalInfo {
    name: String,
    age: u8,
}

/// Creates a new log for testing.
async fn new_log() -> FileLog {
    let temp_file = tempfile().expect("Failed to create a temp file");
    let temp_file = tokio::fs::File::from_std(temp_file);
    FileLog::new_with_file(temp_file)
}

/// Appends a record to the log.
async fn append_record(log: &FileLog, name: &str, age: u8) -> Offset {
    let data = PersonalInfo {
        name: name.to_string(),
        age,
    };
    let serialized_data = bincode::serialize(&data).expect("Failed to serialize data");
    let offset = log.append(serialized_data).await.expect("Failed to append data");
    offset
}

/// Appends a record to the log and returns the offset.
#[tokio::test]
async fn test_append() {
    let log = new_log().await;

    // Create a record.
    let data = PersonalInfo {
        name: "John Doe".to_string(),
        age: 42,
    };

    // Serialize the record.
    let data = bincode::serialize(&data).expect("Failed to serialize data");

    // Append the record to the log & assert no error
    assert!(log.append(data).await.is_ok());
}

/// Reads a record from the log.
#[tokio::test]
async fn test_read() {
    let log = new_log().await;

    // Create a record.
    let data = PersonalInfo {
        name: "John Doe".to_string(),
        age: 42,
    };

    // Serialize the record.
    let data = bincode::serialize(&data).expect("Failed to serialize data");

    // Append the record to the log.
    let offset = log.append(data).await.expect("Failed to append data");

    // Read the record from the log.
    let read_data = log
        .read(offset)
        .await
        .expect("Failed to read data from log");

    // Deserialize the record.
    let read_data: PersonalInfo =
        bincode::deserialize(&read_data.0).expect("Failed to deserialize data");

    // Assert that the record is deserialized correctly.
    assert_eq!(read_data.name, "John Doe");
    assert_eq!(read_data.age, 42);
}

/// Reads two records from the log.
#[tokio::test]
async fn test_read_two_records() {
    // A test that creates records for john & jane doe, and asserts that both records are deserialized correctly.
    let log = new_log().await;

    // Create two records.
    let john_doe = PersonalInfo {
        name: "John Doe".to_string(),
        age: 42,
    };

    let jane_doe = PersonalInfo {
        name: "Jane Doe".to_string(),
        age: 43,
    };

    // Serialize the records.
    let john_doe = bincode::serialize(&john_doe).expect("Failed to serialize data");
    let jane_doe = bincode::serialize(&jane_doe).expect("Failed to serialize data");

    // Append the records to the log.
    let john_offset = log.append(john_doe).await.expect("Failed to append data");
    let jane_offset = log.append(jane_doe).await.expect("Failed to append data");

    // Read the records from the log.
    let read_john_doe = log
        .read(john_offset)
        .await
        .expect("Failed to read data from log");

    let read_jane_doe = log
        .read(jane_offset)
        .await
        .expect("Failed to read data from log");

    // Deserialize the records.
    let read_john_doe: PersonalInfo =
        bincode::deserialize(&read_john_doe.0).expect("Failed to deserialize data");
    let read_jane_doe: PersonalInfo =
        bincode::deserialize(&read_jane_doe.0).expect("Failed to deserialize data");

    // Assert that the records are deserialized correctly.
    assert_eq!(read_john_doe.name, "John Doe");
    assert_eq!(read_jane_doe.name, "Jane Doe");
    assert_eq!(read_john_doe.age, 42);
    assert_eq!(read_jane_doe.age, 43);
    assert_ne!(read_john_doe.name, read_jane_doe.name);
    assert_ne!(read_john_doe.age, read_jane_doe.age);
}

/// Tests that the next offset is returned correctly.
#[tokio::test]
async fn the_next_offset_is_returned_correctly() {
    let log = new_log().await;

    // Create a record.
    let data = PersonalInfo {
        name: "John Doe".to_string(),
        age: 42,
    };

    // Serialize the record.
    let data = bincode::serialize(&data).expect("Failed to serialize data");

    // Append the record to the log.
    let offset = log.append(data.clone()).await.expect("Failed to append data");

    // Read the record from the log.
    let read_data = log
        .read(offset)
        .await
        .expect("Failed to read data from log");

    // Get the next offset. This is calculated by adding the size of the record to the current offset.
    let expected_next_offset = offset + std::mem::size_of::<u64>() as u64 + data.len() as u64;

    // Assert that the next offset is correct.
    assert_eq!(read_data.1, expected_next_offset);
}

/// Tests that batch reading from the log works correctly when there are fewer records than requested.
#[tokio::test]
async fn test_batch_read_less_records_than_requested() {
    let log = new_log().await;

    // Append two records.
    append_record(&log, "John Doe", 30).await;
    append_record(&log, "Jane Doe", 31).await;

    // Try to read 5 records, expecting only 2 to be returned.
    let (records, _) = log.batch_read(0, 5).await.expect("Failed to batch read from log");

    assert_eq!(records.len(), 2, "Should have read 2 records, but got {}", records.len());
}

/// Tests that batch reading from the log works correctly when there the same number of records as requested.
#[tokio::test]
async fn test_batch_read_exact_number_of_requested_records() {
    let log = new_log().await;

    // Append five records.
    for i in 0..5 {
        let _ = append_record(&log, &format!("User{}", i), 20 + i as u8).await;
    }

    // Try to read 5 records, expecting exactly 5 to be returned.
    let (records, _) = log.batch_read(0, 5).await.expect("Failed to batch read from log");
    assert_eq!(records.len(), 5, "Should have read 5 records, but got {}", records.len());
}

/// Tests that batch reading from the log works correctly when there are more records than requested.
#[tokio::test]
async fn test_batch_read_when_more_records_exist_than_requested() {
    let log = new_log().await;

    // Append ten records.
    for i in 0..10 {
        let _ = append_record(&log, &format!("User{}", i), 20 + i as u8).await;
    }

    // Try to read 5 records, expecting exactly 5 to be returned even though there are more records.
    let (records, _) = log.batch_read(0, 5).await.expect("Failed to batch read from log");
    assert_eq!(records.len(), 5, "Should have read 5 records, but got {}", records.len());
}

#[tokio::test]
async fn test_batch_read_when_no_records_exist_returns_error() {
    let log = new_log().await;

    // Try to read 5 records, expecting 0 to be returned since the log is empty.
    let res = log.batch_read(0, 5).await;

    // Assert that the result is an error.
    assert!(res.is_err());

    // Assert that the error is IndexOutOfBounds.
    match res.unwrap_err() {
        IndexOutOfBounds => {}
        err => panic!("Expected IndexOutOfBounds error, but got {:?}", err),
    }
}

#[tokio::test]
async fn test_batch_read_data_integrity_and_order() {
    let log = new_log().await;

    // Append several records with unique data.
    let mut expected_records = Vec::new();
    for i in 0..10 {
        let name = format!("User{}", i);
        let age = 20 + i as u8;
        append_record(&log, &name, age).await;
        expected_records.push(PersonalInfo { name, age });
    }

    // Perform a batch read to get the same number of records as appended.
    let (records, _) = log.batch_read(0, expected_records.len()).await.expect("Failed to batch read from log");

    // Check that the number of records read matches the number appended.
    assert_eq!(records.len(), expected_records.len(), "Expected {} records, got {}", expected_records.len(), records.len());

    // Check each record for integrity and order.
    for (index, record_bytes) in records.iter().enumerate() {
        let record: PersonalInfo = bincode::deserialize(record_bytes).expect("Failed to deserialize data");
        let expected_record = &expected_records[index];
        assert_eq!(record.name, expected_record.name, "Expected name {}, got {}", expected_record.name, record.name);
        assert_eq!(record.age, expected_record.age, "Expected age {}, got {}", expected_record.age, record.age);
    }
}

/// This test will append two records concurrently and ensure that both writes succeed
/// and that the log contains both entries, no matter the order in which they are written.
#[tokio::test]
async fn test_concurrent_writes() {
    let log = Arc::new(new_log().await);

    // Concurrently append two different records.
    let handle1 =
        tokio::spawn(
            {
                let log = log.clone();
                async move {
                    append_record(&log, "ConcurrentUser1", 25).await;
                }
            });

    let handle2 =
        tokio::spawn(
            {
                let log = log.clone();
                async move {
                    append_record(&log, "ConcurrentUser2", 26).await;
                }
            });

    // Await both handles to finish.
    let _ = tokio::try_join!(handle1, handle2);

    // Read back the log to ensure both records were written.
    let (records, _) = log.batch_read(0, 2).await.expect("Failed to batch read from log");
    assert_eq!(records.len(), 2, "Both records should have been written");

    let record1: PersonalInfo = bincode::deserialize(&records[0]).expect("Failed to deserialize data");
    let record2: PersonalInfo = bincode::deserialize(&records[1]).expect("Failed to deserialize data");

    assert!(record1.name == "ConcurrentUser1" || record1.name == "ConcurrentUser2");
    assert!(record2.name == "ConcurrentUser1" || record2.name == "ConcurrentUser2");
    assert_ne!(record1.name, record2.name);
    assert!(record1.age == 25 || record1.age == 26);
    assert!(record2.age == 25 || record2.age == 26);
    assert_ne!(record1.age, record2.age);
}

/// This test will write a record to the log and then concurrently
/// read it from multiple tasks to ensure that they all see the same value.
#[tokio::test]
async fn test_concurrent_reads_after_write() {
    let log = Arc::new(new_log().await);

    // Append a record.
    let offset = append_record(&log, "User", 30).await;

    // Concurrently read the record from multiple tasks.
    let mut handles = Vec::new();
    for _ in 0..10 {
        let log_clone = log.clone();
        handles.push(tokio::spawn(async move {
            log_clone.read(offset).await.expect("Failed to read data from log")
        }));
    }

    // Collect the results.
    let results = futures::future::join_all(handles).await;

    assert_eq!(results.len(), 10, "Expected 10 results, got {}", results.len());

    // Check that all tasks read the same value.
    for res in results {
        let (bytes, _) = res.expect("Task failed");
        let record: PersonalInfo = bincode::deserialize(&bytes).expect("Failed to deserialize data");
        assert_eq!(record.name, "User");
        assert_eq!(record.age, 30);
    }
}

/// This test ensures that a read following a write will always see the latest written value,
/// even when both operations are performed concurrently.
#[tokio::test]
async fn test_write_read_consistency() {
    let log = Arc::new(new_log().await);
    let barrier = Arc::new(tokio::sync::Barrier::new(2));

    // Spawn a task to write to the log.
    let write_handle = tokio::spawn({
        let log = log.clone();
        let barrier = barrier.clone();
        async move {
            append_record(&log, "ConsistentUser", 40).await;
            barrier.wait().await;
        }
    });

    // Immediately spawn a task to read from the log.
    let read_handle = tokio::spawn({
        let log = log.clone();
        let barrier = barrier.clone();
        async move {
            barrier.wait().await;
            log.read(0).await
        }
    });

    // Wait for both the write and read to complete.
    let (_, read_result) = tokio::try_join!(write_handle, read_handle).expect("Concurrency test failed");

    // Check that the read task got the value written by the write task.
    let (bytes, _) = read_result.expect("Failed to read data from log");
    let record: PersonalInfo = bincode::deserialize(&bytes).expect("Failed to deserialize data");
    assert_eq!(record.name, "ConsistentUser");
    assert_eq!(record.age, 40);
}