use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::extras::dirge_paths::ProjectPaths;
use crate::provider::AnyAgent;
static ORCHESTRATOR_IN_FLIGHT: AtomicBool = AtomicBool::new(false);
fn try_claim_orchestrator() -> bool {
ORCHESTRATOR_IN_FLIGHT
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
struct InFlightGuard;
impl Drop for InFlightGuard {
fn drop(&mut self) {
ORCHESTRATOR_IN_FLIGHT.store(false, Ordering::Release);
}
}
const STAGE_TIMEOUT: Duration = Duration::from_secs(300);
type Stage = (&'static str, Pin<Box<dyn Future<Output = ()> + Send>>);
pub fn spawn_post_session(
agent: AnyAgent,
paths: ProjectPaths,
digest: crate::agent::session_digest::SessionDigest,
base: String,
) {
tokio::spawn(async move {
if !try_claim_orchestrator() {
tracing::debug!(
target: "dirge::post_session",
"post-session orchestrator already running — skipping overlapping spawn",
);
return;
}
let _in_flight = InFlightGuard;
let transcript =
crate::agent::session_digest::assemble_review_transcript(digest, &paths.root, base);
let stages: Vec<Stage> = vec![
(
"background-review",
Box::pin(stage_background_review(
agent.clone(),
paths.clone(),
transcript,
)),
),
(
"skills-curator",
Box::pin(stage_skills_curator(agent.clone(), paths.clone())),
),
(
"memory-curator",
Box::pin(stage_memory_curator(agent.clone(), paths.clone())),
),
];
run_stages_sequentially(stages, STAGE_TIMEOUT).await;
crate::agent::agent_loop::context_manager::mark_memories_dirty();
});
}
async fn run_stages_sequentially(stages: Vec<Stage>, per_stage_timeout: Duration) {
for (name, fut) in stages {
match tokio::time::timeout(per_stage_timeout, fut).await {
Ok(()) => {}
Err(_) => {
tracing::warn!(
target: "dirge::post_session",
stage = %name,
timeout_secs = %per_stage_timeout.as_secs(),
"post-session stage timed out — skipping, continuing chain",
);
}
}
}
}
async fn stage_background_review(agent: AnyAgent, paths: ProjectPaths, transcript: String) {
crate::agent::review::run_background_review(agent, paths, transcript, None).await;
}
async fn stage_skills_curator(agent: AnyAgent, paths: ProjectPaths) {
let paths_for_blocking = paths.clone();
let candidate_list = tokio::task::spawn_blocking(move || {
let mut curator = crate::extras::skills::curator::Curator::new(&paths_for_blocking).ok()?;
if !curator.should_run_now() {
return None;
}
let _ = curator.apply_automatic_transitions();
crate::extras::skills::usage::UsageStore::load(&paths_for_blocking)
.ok()
.map(|store| crate::extras::skills::curator::render_candidate_list(&store))
})
.await
.ok()
.flatten();
if let Some(candidates) = candidate_list {
crate::agent::review::run_curator_review(agent, paths, candidates).await;
}
}
async fn stage_memory_curator(agent: AnyAgent, paths: ProjectPaths) {
let paths_for_blocking = paths.clone();
let mechanical_report = tokio::task::spawn_blocking(move || {
let mut curator =
match crate::extras::memory_curator::MemoryCurator::new(&paths_for_blocking) {
Ok(c) => c,
Err(e) => {
tracing::debug!(
target: "dirge::memory_curator",
error = %e,
"Failed to construct memory curator — skipping run",
);
return None;
}
};
if !curator.should_run_now() {
return None;
}
match curator.run_mechanical_pass() {
Ok(report) => {
tracing::info!(
target: "dirge::memory_curator",
total = %report.total_entries,
stale = %report.stale_candidates.len(),
"memory curator mechanical pass complete",
);
Some(report)
}
Err(e) => {
tracing::warn!(
target: "dirge::memory_curator",
error = %e,
"memory curator mechanical pass failed",
);
None
}
}
})
.await
.ok()
.flatten();
if let Some(report) = mechanical_report {
crate::agent::review::run_memory_curator_review(agent, paths, report).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
fn recording_stage(
name: &'static str,
log: Arc<Mutex<Vec<&'static str>>>,
inflight: Arc<AtomicUsize>,
max_inflight: Arc<AtomicUsize>,
) -> Stage {
let fut = async move {
let now = inflight.fetch_add(1, Ordering::SeqCst) + 1;
max_inflight.fetch_max(now, Ordering::SeqCst);
tokio::task::yield_now().await;
log.lock().unwrap().push(name);
inflight.fetch_sub(1, Ordering::SeqCst);
};
(name, Box::pin(fut))
}
#[tokio::test]
async fn run_stages_sequentially_runs_in_order_without_overlap() {
let log = Arc::new(Mutex::new(Vec::new()));
let inflight = Arc::new(AtomicUsize::new(0));
let max_inflight = Arc::new(AtomicUsize::new(0));
let stages = vec![
recording_stage("a", log.clone(), inflight.clone(), max_inflight.clone()),
recording_stage("b", log.clone(), inflight.clone(), max_inflight.clone()),
recording_stage("c", log.clone(), inflight.clone(), max_inflight.clone()),
];
run_stages_sequentially(stages, Duration::from_secs(5)).await;
assert_eq!(*log.lock().unwrap(), vec!["a", "b", "c"], "strict order");
assert_eq!(
max_inflight.load(Ordering::SeqCst),
1,
"at most one stage in flight at any instant",
);
}
#[tokio::test(start_paused = true)]
async fn run_stages_sequentially_skips_timed_out_stage_and_continues() {
let log = Arc::new(Mutex::new(Vec::new()));
let log_slow = log.clone();
let slow: Stage = (
"slow",
Box::pin(async move {
tokio::time::sleep(Duration::from_secs(600)).await;
log_slow.lock().unwrap().push("slow");
}),
);
let log_after = log.clone();
let after: Stage = (
"after",
Box::pin(async move {
log_after.lock().unwrap().push("after");
}),
);
run_stages_sequentially(vec![slow, after], Duration::from_secs(300)).await;
assert_eq!(
*log.lock().unwrap(),
vec!["after"],
"slow stage must be skipped (never pushes), after stage still runs",
);
}
#[tokio::test]
async fn run_stages_sequentially_handles_empty_list() {
run_stages_sequentially(vec![], Duration::from_secs(5)).await;
}
#[test]
fn orchestrator_claim_is_exclusive_and_released_on_drop() {
assert!(try_claim_orchestrator(), "first claim must succeed");
{
let _in_flight = InFlightGuard;
assert!(
!try_claim_orchestrator(),
"a concurrent claim must be refused while one is in flight",
);
} assert!(
try_claim_orchestrator(),
"claim must succeed again after the guard released the slot",
);
ORCHESTRATOR_IN_FLIGHT.store(false, Ordering::Release);
}
#[tokio::test(start_paused = true)]
async fn run_stages_sequentially_lets_fast_stages_complete() {
let log = Arc::new(Mutex::new(Vec::new()));
let log_fast = log.clone();
let fast: Stage = (
"fast",
Box::pin(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
log_fast.lock().unwrap().push("fast");
}),
);
run_stages_sequentially(vec![fast], Duration::from_secs(300)).await;
assert_eq!(*log.lock().unwrap(), vec!["fast"]);
}
}