sk-store 2.6.0

SimKube trace store library
Documentation
mod pod_watcher_test;

use std::sync::LazyLock;

use assertables::*;
use futures::stream;
use mockall::predicate;
use sk_core::prelude::*;
use sk_testutils::*;
use tokio::sync::mpsc;

use super::*;
use crate::watchers::MockEventHandler;

static EXPECTED_INDEX: LazyLock<HashSet<String>> = LazyLock::new(|| {
    HashSet::from([
        format!("{TEST_NAMESPACE}/depl0"),
        format!("{TEST_NAMESPACE}/depl1"),
        format!("{TEST_NAMESPACE}/depl2"),
    ])
});

#[rstest(tokio::test)]
async fn test_handle_initialize_event() {
    let deployments: Vec<_> = (0..3).map(|i| test_deployment(&format!("depl{i}"))).collect();
    let mut handler = Box::new(MockEventHandler::new());
    for i in 0..3 {
        handler
            .expect_applied()
            .with(predicate::eq(deployments[i].clone()), predicate::eq(0))
            .returning(|_, _| Ok(()))
            .once();
    }

    let (ready_tx, _): (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel(1);
    let mut watcher = ObjWatcher::<DynamicObject>::new(handler, Box::pin(stream::empty()), ready_tx);

    watcher.handle_event(&Event::Init, 0).await.unwrap();
    for depl in deployments {
        watcher.handle_event(&Event::InitApply(depl), 0).await.unwrap();
    }
    watcher.handle_event(&Event::InitDone, 0).await.unwrap();

    assert_bag_eq!(watcher.index, *EXPECTED_INDEX);
}

#[rstest(tokio::test)]
async fn test_handle_initialize_event_with_created_obj() {
    let deployments: Vec<_> = (0..3).map(|i| test_deployment(&format!("depl{i}"))).collect();
    let mut handler = Box::new(MockEventHandler::new());
    for i in 0..3 {
        handler
            .expect_applied()
            .with(predicate::eq(deployments[i].clone()), predicate::eq(0))
            .returning(|_, _| Ok(()))
            .once();
    }

    let (ready_tx, _): (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel(1);
    let mut watcher = ObjWatcher::<DynamicObject>::new(handler, Box::pin(stream::empty()), ready_tx);
    watcher.index = HashSet::from([format!("{TEST_NAMESPACE}/depl0"), format!("{TEST_NAMESPACE}/depl1")]);

    watcher.handle_event(&Event::Init, 0).await.unwrap();
    for depl in deployments {
        watcher.handle_event(&Event::InitApply(depl), 0).await.unwrap();
    }
    watcher.handle_event(&Event::InitDone, 0).await.unwrap();

    assert_bag_eq!(watcher.index, *EXPECTED_INDEX);
}

#[rstest(tokio::test)]
async fn test_handle_initialize_event_with_deleted_obj() {
    let deployments: Vec<_> = (0..2).map(|i| test_deployment(&format!("depl{i}"))).collect();
    let mut handler = Box::new(MockEventHandler::new());
    for i in 0..2 {
        handler
            .expect_applied()
            .with(predicate::eq(deployments[i].clone()), predicate::eq(0))
            .returning(|_, _| Ok(()))
            .once();
    }

    handler
        .expect_deleted()
        .with(predicate::eq(format!("{TEST_NAMESPACE}/depl2")), predicate::eq(0))
        .returning(|_, _| Ok(()))
        .once();

    let (ready_tx, _): (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel(1);
    let mut watcher = ObjWatcher::<DynamicObject>::new(handler, Box::pin(stream::empty()), ready_tx);
    watcher.index = EXPECTED_INDEX.clone();

    watcher.handle_event(&Event::Init, 0).await.unwrap();
    for depl in deployments {
        watcher.handle_event(&Event::InitApply(depl), 0).await.unwrap();
    }
    watcher.handle_event(&Event::InitDone, 0).await.unwrap();

    assert_bag_eq!(
        watcher.index,
        HashSet::from([format!("{TEST_NAMESPACE}/depl0"), format!("{TEST_NAMESPACE}/depl1"),])
    );
}