use std::collections::BTreeMap;
use cargowatch_core::{SessionEvent, SessionMode, SessionStatus};
use cargowatch_runner::{ManagedRunRequest, spawn_managed_session};
use cargowatch_store::SessionStore;
use tokio::sync::mpsc;
#[tokio::test]
async fn managed_session_persists_logs_and_ordered_artifacts() {
let temp = tempfile::tempdir().expect("tempdir");
let project = temp.path().join("demo");
std::fs::create_dir_all(project.join("src")).expect("create src");
std::fs::write(
project.join("Cargo.toml"),
"[package]\nname = \"demo\"\nversion = \"0.1.0\"\nedition = \"2024\"\n",
)
.expect("write manifest");
std::fs::write(
project.join("src/main.rs"),
"fn main() {\n println!(\"cargo watch test\");\n}\n",
)
.expect("write source");
let store = SessionStore::connect(&temp.path().join("history.sqlite"))
.await
.expect("store");
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let request = ManagedRunRequest::new(
vec![
"cargo".into(),
"build".into(),
"--quiet".into(),
"--offline".into(),
"--manifest-path".into(),
project.join("Cargo.toml").display().to_string(),
],
project.clone(),
Some(project.clone()),
);
let handle = spawn_managed_session(request, event_tx).expect("managed session");
let session_id = handle.session_id().to_string();
let mut persisted_log_counts = BTreeMap::new();
let finish = loop {
let event = event_rx.recv().await.expect("session event");
persist_event(&store, &event, &mut persisted_log_counts, 512)
.await
.expect("persist event");
if let SessionEvent::SessionFinished(finish) = &event {
break finish.clone();
}
};
let waited = handle.wait().await.expect("wait");
assert_eq!(finish, waited);
assert_eq!(finish.status, SessionStatus::Succeeded);
let session = store
.load_session(&session_id, 512)
.await
.expect("load")
.expect("session");
assert_eq!(session.info.mode, SessionMode::Managed);
assert_eq!(session.info.status, SessionStatus::Succeeded);
assert!(!session.logs.is_empty());
assert!(!session.artifacts.is_empty());
assert!(
session
.artifacts
.windows(2)
.all(|pair| pair[0].sequence <= pair[1].sequence)
);
assert!(
session
.artifacts
.iter()
.any(|artifact| artifact.target.as_deref() == Some("demo"))
);
let history = store.recent_sessions(10).await.expect("history");
assert_eq!(history.len(), 1);
assert_eq!(history[0].info.session_id, session_id);
assert_eq!(history[0].info.mode, SessionMode::Managed);
assert_eq!(history[0].info.status, SessionStatus::Succeeded);
}
async fn persist_event(
store: &SessionStore,
event: &SessionEvent,
persisted_log_counts: &mut BTreeMap<String, usize>,
max_persisted_log_lines: usize,
) -> anyhow::Result<()> {
match event {
SessionEvent::SessionStarted(info) => store.insert_session_start(info).await?,
SessionEvent::OutputLine { session_id, entry } => {
let count = persisted_log_counts.entry(session_id.clone()).or_default();
if *count < max_persisted_log_lines {
store.insert_log_line(session_id, entry).await?;
*count += 1;
}
}
SessionEvent::Diagnostic {
session_id,
diagnostic,
} => {
store.insert_diagnostic(session_id, diagnostic).await?;
}
SessionEvent::ArtifactBuilt {
session_id,
artifact,
} => {
store.insert_artifact(session_id, artifact).await?;
}
SessionEvent::SessionFinished(finish) => store.finish_session(finish).await?,
SessionEvent::ProcessDetected(process) | SessionEvent::ProcessUpdated(process) => {
store.upsert_detected_process(process).await?;
}
SessionEvent::ProcessGone {
session_id,
observed_at,
..
} => {
store.mark_process_gone(session_id, *observed_at).await?;
}
}
Ok(())
}