use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch;
use super::events::{
ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
};
use super::model::{
AbandonRequest, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion,
ProcessListFilter, ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport,
ProcessStarted, SessionScope, WaitState,
};
use super::registry::{ProcessPruneReport, ProcessRegistry};
use crate::PluginError;
const AWAIT_BACKOFF_MIN: Duration = Duration::from_millis(25);
const AWAIT_BACKOFF_MAX: Duration = Duration::from_secs(1);
#[derive(Clone, Default)]
pub struct ProcessChangeHub {
inner: Arc<Mutex<HashMap<String, watch::Sender<u64>>>>,
}
impl ProcessChangeHub {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self, process_id: &str) -> watch::Receiver<u64> {
let mut guard = self.inner.lock().expect("process change hub lock poisoned");
guard
.entry(process_id.to_string())
.or_insert_with(|| {
let (tx, _rx) = watch::channel(0);
tx
})
.subscribe()
}
pub fn notify(&self, process_id: &str) {
let mut guard = self.inner.lock().expect("process change hub lock poisoned");
let mut remove = false;
if let Some(tx) = guard.get(process_id) {
if tx.receiver_count() == 0 {
remove = true;
} else {
let next = (*tx.borrow()).wrapping_add(1);
if tx.send(next).is_err() {
remove = true;
}
}
}
if remove {
guard.remove(process_id);
}
}
#[cfg(test)]
fn tracked_processes(&self) -> usize {
self.inner
.lock()
.expect("process change hub lock poisoned")
.len()
}
}
#[async_trait::async_trait]
pub trait ProcessEventSink: Send + Sync {
async fn emit(&self, event: &ProcessEvent);
}
struct WatchedProcessRegistry {
inner: Arc<dyn ProcessRegistry>,
hub: ProcessChangeHub,
sink: Option<Arc<dyn ProcessEventSink>>,
}
pub fn watch_process_registry(
inner: Arc<dyn ProcessRegistry>,
) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
watch_process_registry_with_sink(inner, None)
}
pub fn watch_process_registry_with_sink(
inner: Arc<dyn ProcessRegistry>,
sink: Option<Arc<dyn ProcessEventSink>>,
) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
let hub = ProcessChangeHub::new();
(
Arc::new(WatchedProcessRegistry {
inner,
hub: hub.clone(),
sink,
}),
hub,
)
}
#[derive(Clone)]
pub struct ProcessAwaiter {
registry: Arc<dyn ProcessRegistry>,
hub: Option<ProcessChangeHub>,
}
impl ProcessAwaiter {
pub fn new(registry: Arc<dyn ProcessRegistry>, hub: ProcessChangeHub) -> Self {
Self {
registry,
hub: Some(hub),
}
}
pub fn polling(registry: Arc<dyn ProcessRegistry>) -> Self {
Self {
registry,
hub: None,
}
}
pub async fn await_terminal(
&self,
process_id: &str,
) -> Result<ProcessAwaitOutput, PluginError> {
let mut backoff = AWAIT_BACKOFF_MIN;
if let Some(hub) = self.hub.as_ref() {
let mut rx = hub.subscribe(process_id);
loop {
if let Some(output) = self.read_terminal(process_id).await? {
return Ok(output);
}
tokio::select! {
changed = rx.changed() => {
match changed {
Ok(()) => backoff = AWAIT_BACKOFF_MIN,
Err(_) => break,
}
}
_ = tokio::time::sleep(backoff) => {
backoff = next_backoff(backoff);
}
}
}
}
loop {
if let Some(output) = self.read_terminal(process_id).await? {
return Ok(output);
}
tokio::time::sleep(backoff).await;
backoff = next_backoff(backoff);
}
}
pub async fn await_event(
&self,
process_id: &str,
event_type: &str,
after_sequence: u64,
) -> Result<ProcessEvent, PluginError> {
let mut backoff = AWAIT_BACKOFF_MIN;
if let Some(hub) = self.hub.as_ref() {
let mut rx = hub.subscribe(process_id);
loop {
if let Some(event) = self
.read_event(process_id, event_type, after_sequence)
.await?
{
return Ok(event);
}
tokio::select! {
changed = rx.changed() => {
match changed {
Ok(()) => backoff = AWAIT_BACKOFF_MIN,
Err(_) => break,
}
}
_ = tokio::time::sleep(backoff) => {
backoff = next_backoff(backoff);
}
}
}
}
loop {
if let Some(event) = self
.read_event(process_id, event_type, after_sequence)
.await?
{
return Ok(event);
}
tokio::time::sleep(backoff).await;
backoff = next_backoff(backoff);
}
}
async fn read_terminal(
&self,
process_id: &str,
) -> Result<Option<ProcessAwaitOutput>, PluginError> {
let record = self
.registry
.get_process(process_id)
.await
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
Ok(record.status.await_output().cloned())
}
async fn read_event(
&self,
process_id: &str,
event_type: &str,
after_sequence: u64,
) -> Result<Option<ProcessEvent>, PluginError> {
Ok(self
.registry
.events_after(process_id, after_sequence)
.await?
.into_iter()
.find(|event| event.event_type == event_type))
}
}
fn next_backoff(current: Duration) -> Duration {
current.saturating_mul(2).min(AWAIT_BACKOFF_MAX)
}
#[async_trait::async_trait]
pub trait ProcessAttach: Send + Sync {
async fn await_terminal(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
}
#[async_trait::async_trait]
impl ProcessRegistry for WatchedProcessRegistry {
fn durability_tier(&self) -> crate::DurabilityTier {
self.inner.durability_tier()
}
async fn register_process(
&self,
registration: ProcessRegistration,
) -> Result<ProcessRecord, PluginError> {
let process_id = registration.id.clone();
let record = self.inner.register_process(registration).await?;
self.hub.notify(&process_id);
Ok(record)
}
async fn set_external_ref(
&self,
process_id: &str,
external_ref: ProcessExternalRef,
) -> Result<ProcessRecord, PluginError> {
let record = self
.inner
.set_external_ref(process_id, external_ref)
.await?;
self.hub.notify(process_id);
Ok(record)
}
async fn grant_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
descriptor: ProcessHandleDescriptor,
) -> Result<ProcessHandleGrant, PluginError> {
self.inner
.grant_handle(session_scope, process_id, descriptor)
.await
}
async fn revoke_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<(), PluginError> {
self.inner.revoke_handle(session_scope, process_id).await
}
async fn transfer_handle_grants(
&self,
from_scope: &SessionScope,
to_scope: &SessionScope,
process_ids: &[String],
) -> Result<(), PluginError> {
self.inner
.transfer_handle_grants(from_scope, to_scope, process_ids)
.await
}
async fn list_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
self.inner.list_handle_grants(session_scope).await
}
async fn list_live_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
self.inner.list_live_handle_grants(session_scope).await
}
async fn has_handle_grant(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<bool, PluginError> {
self.inner.has_handle_grant(session_scope, process_id).await
}
async fn handle_grants_for_process(
&self,
process_id: &str,
) -> Result<Vec<ProcessHandleGrant>, PluginError> {
self.inner.handle_grants_for_process(process_id).await
}
async fn delete_session_process_state(
&self,
session_id: &str,
) -> Result<ProcessSessionDeleteReport, PluginError> {
self.inner.delete_session_process_state(session_id).await
}
async fn append_event(
&self,
process_id: &str,
request: ProcessEventAppendRequest,
) -> Result<ProcessEventAppendResult, PluginError> {
let result = self.inner.append_event(process_id, request).await?;
self.hub.notify(process_id);
if let Some(sink) = self.sink.as_ref() {
sink.emit(&result.event).await;
}
Ok(result)
}
async fn events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
self.inner.events_after(process_id, after_sequence).await
}
async fn count_events_through(
&self,
process_id: &str,
event_type: &str,
up_to_sequence: u64,
) -> Result<u64, PluginError> {
self.inner
.count_events_through(process_id, event_type, up_to_sequence)
.await
}
async fn recent_events(
&self,
process_id: &str,
limit: usize,
) -> Result<Vec<ProcessEvent>, PluginError> {
self.inner.recent_events(process_id, limit).await
}
async fn wake_events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
self.inner
.wake_events_after(process_id, after_sequence)
.await
}
async fn complete_process(
&self,
process_id: &str,
await_output: ProcessAwaitOutput,
) -> Result<ProcessRecord, PluginError> {
let record = self
.inner
.complete_process(process_id, await_output)
.await?;
self.hub.notify(process_id);
Ok(record)
}
async fn record_first_started(
&self,
process_id: &str,
started: ProcessStarted,
) -> Result<ProcessRecord, PluginError> {
let record = self.inner.record_first_started(process_id, started).await?;
self.hub.notify(process_id);
Ok(record)
}
async fn request_process_abandon(
&self,
process_id: &str,
request: AbandonRequest,
) -> Result<ProcessRecord, PluginError> {
let record = self
.inner
.request_process_abandon(process_id, request)
.await?;
self.hub.notify(process_id);
Ok(record)
}
async fn set_process_wait(
&self,
process_id: &str,
wait: WaitState,
) -> Result<ProcessRecord, PluginError> {
let record = self.inner.set_process_wait(process_id, wait).await?;
self.hub.notify(process_id);
Ok(record)
}
async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
let record = self.inner.clear_process_wait(process_id).await?;
self.hub.notify(process_id);
Ok(record)
}
async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
self.inner.get_process(process_id).await
}
async fn list_processes(
&self,
filter: &ProcessListFilter,
) -> Result<Vec<ProcessRecord>, PluginError> {
self.inner.list_processes(filter).await
}
async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
self.inner.ack_wake(process_id, sequence).await?;
self.hub.notify(process_id);
Ok(())
}
async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
self.inner.list_non_terminal().await
}
async fn claim_process_lease(
&self,
process_id: &str,
owner: &crate::LeaseOwnerIdentity,
lease_ttl_ms: u64,
) -> Result<ProcessLeaseClaimOutcome, PluginError> {
self.inner
.claim_process_lease(process_id, owner, lease_ttl_ms)
.await
}
async fn reclaim_process_lease(
&self,
process_id: &str,
owner: &crate::LeaseOwnerIdentity,
observed_holder: &ProcessLease,
lease_ttl_ms: u64,
) -> Result<ProcessLeaseClaimOutcome, PluginError> {
self.inner
.reclaim_process_lease(process_id, owner, observed_holder, lease_ttl_ms)
.await
}
async fn renew_process_lease(
&self,
lease: &ProcessLease,
lease_ttl_ms: u64,
) -> Result<ProcessLease, PluginError> {
self.inner.renew_process_lease(lease, lease_ttl_ms).await
}
async fn get_process_lease(
&self,
process_id: &str,
) -> Result<Option<ProcessLease>, PluginError> {
self.inner.get_process_lease(process_id).await
}
async fn complete_process_lease(
&self,
completion: &ProcessLeaseCompletion,
) -> Result<(), PluginError> {
self.inner.complete_process_lease(completion).await
}
async fn prune_terminal_processes(
&self,
cutoff_epoch_ms: u64,
) -> Result<ProcessPruneReport, PluginError> {
self.inner.prune_terminal_processes(cutoff_epoch_ms).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::{
ProcessInput, ProcessProvenance, ProcessRegistration, TestLocalProcessRegistry, ToolControl,
};
fn registration(process_id: &str) -> ProcessRegistration {
ProcessRegistration::new(
process_id,
ProcessInput::External {
metadata: serde_json::json!({}),
},
crate::RecoveryDisposition::ExternallyOwned,
ProcessProvenance::host(),
)
}
fn plain_event_type(name: &str) -> crate::ProcessEventType {
crate::ProcessEventType {
name: name.to_string(),
payload_schema: crate::LashSchema::any(),
semantics: crate::ProcessEventSemanticsSpec::default(),
}
}
fn registration_with_events(process_id: &str, event_types: &[&str]) -> ProcessRegistration {
registration(process_id)
.with_extra_event_types(event_types.iter().map(|name| plain_event_type(name)))
}
#[derive(Clone, Default)]
struct CollectingSink {
events: Arc<Mutex<Vec<(String, u64)>>>,
}
impl CollectingSink {
fn collected(&self) -> Vec<(String, u64)> {
self.events.lock().expect("sink lock").clone()
}
}
#[async_trait::async_trait]
impl ProcessEventSink for CollectingSink {
async fn emit(&self, event: &ProcessEvent) {
self.events
.lock()
.expect("sink lock")
.push((event.event_type.clone(), event.sequence));
}
}
fn success(value: serde_json::Value) -> ProcessAwaitOutput {
ProcessAwaitOutput::Success {
value,
control: None::<ToolControl>,
}
}
#[test]
fn backoff_schedule_has_25ms_floor_doubling_to_1s_cap() {
assert_eq!(AWAIT_BACKOFF_MIN, Duration::from_millis(25));
assert_eq!(AWAIT_BACKOFF_MAX, Duration::from_secs(1));
let mut backoff = AWAIT_BACKOFF_MIN;
let mut schedule = vec![backoff];
while backoff < AWAIT_BACKOFF_MAX {
backoff = next_backoff(backoff);
schedule.push(backoff);
}
assert_eq!(
schedule,
[25, 50, 100, 200, 400, 800, 1000]
.into_iter()
.map(Duration::from_millis)
.collect::<Vec<_>>(),
"the backoff doubles from the 25ms floor and saturates at the 1s cap"
);
assert_eq!(
next_backoff(AWAIT_BACKOFF_MAX),
AWAIT_BACKOFF_MAX,
"the cap is absorbing"
);
}
#[tokio::test]
async fn prune_through_decorator_does_not_bump_the_hub() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, hub) = watch_process_registry(raw);
registry
.register_process(registration("proc-terminal"))
.await
.expect("register terminal");
registry
.complete_process("proc-terminal", success(serde_json::json!("done")))
.await
.expect("complete");
registry
.register_process(registration("proc-live"))
.await
.expect("register live");
let mut terminal_rx = hub.subscribe("proc-terminal");
let mut live_rx = hub.subscribe("proc-live");
terminal_rx.mark_unchanged();
live_rx.mark_unchanged();
let report = registry
.prune_terminal_processes(u64::MAX)
.await
.expect("prune");
assert_eq!(report.pruned_processes, 1, "the terminal process pruned");
assert!(
!terminal_rx.has_changed().expect("terminal sender open"),
"prune must not bump the pruned process's hub entry"
);
assert!(
!live_rx.has_changed().expect("live sender open"),
"prune must not bump surviving processes' hub entries"
);
}
#[tokio::test]
async fn hub_subscribe_then_notify_wakes_and_gc_drops_empty_entry() {
let hub = ProcessChangeHub::new();
let mut rx = hub.subscribe("proc");
hub.notify("proc");
tokio::time::timeout(Duration::from_millis(100), rx.changed())
.await
.expect("notify should wake")
.expect("sender remains open");
drop(rx);
hub.notify("proc");
assert_eq!(hub.tracked_processes(), 0);
}
#[tokio::test]
async fn await_event_returns_historical_event_immediately() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, hub) = watch_process_registry(raw);
registry
.register_process(registration("proc"))
.await
.expect("register");
let appended = registry
.append_event(
"proc",
ProcessEventAppendRequest::cancel_requested("proc", Some("stop".to_string())),
)
.await
.expect("append");
let event = ProcessAwaiter::new(Arc::clone(®istry), hub)
.await_event("proc", "process.cancel_requested", 0)
.await
.expect("await event");
assert_eq!(event.sequence, appended.event.sequence);
}
#[tokio::test]
async fn await_terminal_unknown_process_errors() {
let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let err = ProcessAwaiter::polling(registry)
.await_terminal("missing")
.await
.expect_err("unknown process should error");
assert!(err.to_string().contains("unknown process `missing`"));
}
#[tokio::test]
async fn polling_awaiter_resolves_via_backoff() {
let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
registry
.register_process(registration("proc"))
.await
.expect("register");
let writer = Arc::clone(®istry);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
writer
.complete_process("proc", success(serde_json::json!({ "ok": true })))
.await
.expect("complete");
});
let output = tokio::time::timeout(
Duration::from_secs(1),
ProcessAwaiter::polling(registry).await_terminal("proc"),
)
.await
.expect("polling await timeout")
.expect("await terminal");
assert_eq!(output, success(serde_json::json!({ "ok": true })));
}
#[tokio::test]
async fn watched_awaiter_observes_terminal_without_lost_wakeup() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, hub) = watch_process_registry(raw);
registry
.register_process(registration("proc"))
.await
.expect("register");
let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub);
let waiter = tokio::spawn(async move { awaiter.await_terminal("proc").await });
registry
.complete_process("proc", success(serde_json::json!("done")))
.await
.expect("complete");
let output = tokio::time::timeout(Duration::from_millis(200), waiter)
.await
.expect("watched await timeout")
.expect("join")
.expect("await terminal");
assert_eq!(output, success(serde_json::json!("done")));
}
#[tokio::test]
async fn watched_registry_bumps_on_mutations() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, hub) = watch_process_registry(raw);
let mut rx = hub.subscribe("proc");
registry
.register_process(registration("proc"))
.await
.expect("register");
tokio::time::timeout(Duration::from_millis(100), rx.changed())
.await
.expect("register bump")
.expect("sender remains open");
registry
.append_event(
"proc",
ProcessEventAppendRequest::cancel_requested("proc", None),
)
.await
.expect("append");
tokio::time::timeout(Duration::from_millis(100), rx.changed())
.await
.expect("append bump")
.expect("sender remains open");
}
#[tokio::test]
async fn sink_receives_appended_events_in_order() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let sink = CollectingSink::default();
let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
registry
.register_process(registration_with_events(
"proc",
&["producer.a", "producer.b"],
))
.await
.expect("register");
registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
)
.await
.expect("append a");
registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.b", serde_json::json!({})),
)
.await
.expect("append b");
assert_eq!(
sink.collected(),
vec![("producer.a".to_string(), 1), ("producer.b".to_string(), 2)],
"the sink must observe appended events after their write, in append order"
);
}
#[tokio::test]
async fn sink_absent_leaves_appends_unchanged() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, _hub) = watch_process_registry_with_sink(raw, None);
registry
.register_process(registration_with_events("proc", &["producer.a"]))
.await
.expect("register");
let appended = registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
)
.await
.expect("append succeeds with no sink installed");
assert_eq!(appended.event.sequence, 1);
}
#[tokio::test]
async fn sink_not_invoked_for_complete_process_terminal_append() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let sink = CollectingSink::default();
let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
registry
.register_process(registration_with_events("proc", &["producer.a"]))
.await
.expect("register");
registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
)
.await
.expect("explicit append");
registry
.complete_process("proc", success(serde_json::json!("done")))
.await
.expect("complete");
assert_eq!(
sink.collected(),
vec![("producer.a".to_string(), 1)],
"complete_process appends its terminal event through the inner registry, so the \
decorator never emits it to the sink"
);
}
#[tokio::test]
async fn sink_present_still_bumps_hub_on_append() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let sink = CollectingSink::default();
let (registry, hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink)));
let mut rx = hub.subscribe("proc");
registry
.register_process(registration_with_events("proc", &["producer.a"]))
.await
.expect("register");
tokio::time::timeout(Duration::from_millis(100), rx.changed())
.await
.expect("register bump")
.expect("sender remains open");
registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
)
.await
.expect("append");
tokio::time::timeout(Duration::from_millis(100), rx.changed())
.await
.expect("append bump with a sink installed")
.expect("sender remains open");
}
struct NoopRunHandle;
#[async_trait::async_trait]
impl crate::ProcessRunHandle for NoopRunHandle {
async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
Ok(())
}
}
struct PanicAttach;
#[async_trait::async_trait]
impl ProcessAttach for PanicAttach {
async fn await_terminal(
&self,
_process_id: &str,
) -> Result<ProcessAwaitOutput, PluginError> {
panic!("attach should not be called for already-terminal process")
}
}
struct ErrorAttach;
#[async_trait::async_trait]
impl ProcessAttach for ErrorAttach {
async fn await_terminal(
&self,
_process_id: &str,
) -> Result<ProcessAwaitOutput, PluginError> {
Err(PluginError::Session("attach failed".to_string()))
}
}
#[tokio::test]
async fn driver_short_circuits_terminal_before_attach() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
.with_attach(Arc::new(PanicAttach));
let registry = driver.process_registry();
registry
.register_process(registration("proc"))
.await
.expect("register");
registry
.complete_process("proc", success(serde_json::json!("ready")))
.await
.expect("complete");
let output = driver.await_terminal("proc").await.expect("await terminal");
assert_eq!(output, success(serde_json::json!("ready")));
}
#[tokio::test]
async fn driver_attach_errors_propagate_without_poll_fallback() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
.with_attach(Arc::new(ErrorAttach));
driver
.process_registry()
.register_process(registration("proc"))
.await
.expect("register");
let err = driver
.await_terminal("proc")
.await
.expect_err("attach error should propagate");
assert!(err.to_string().contains("attach failed"));
}
struct CountingAttach {
calls: Arc<std::sync::atomic::AtomicUsize>,
}
#[async_trait::async_trait]
impl ProcessAttach for CountingAttach {
async fn await_terminal(
&self,
_process_id: &str,
) -> Result<ProcessAwaitOutput, PluginError> {
self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err(PluginError::Session(
"attach must not be consulted for a terminal process".to_string(),
))
}
}
#[tokio::test]
async fn concurrent_waiters_all_resolve_with_identical_output_on_completion() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let (registry, hub) = watch_process_registry(raw);
registry
.register_process(registration("proc"))
.await
.expect("register");
const WAITERS: usize = 16;
let barrier = Arc::new(tokio::sync::Barrier::new(WAITERS + 1));
let mut waiters = Vec::with_capacity(WAITERS);
for _ in 0..WAITERS {
let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub.clone());
let barrier = Arc::clone(&barrier);
waiters.push(tokio::spawn(async move {
barrier.wait().await;
awaiter.await_terminal("proc").await
}));
}
barrier.wait().await;
let output = success(serde_json::json!({ "raced": true }));
registry
.complete_process("proc", output.clone())
.await
.expect("complete");
for waiter in waiters {
let resolved = tokio::time::timeout(Duration::from_secs(2), waiter)
.await
.expect("each racing waiter resolves under 2s")
.expect("join waiter")
.expect("await terminal");
assert_eq!(
resolved, output,
"every concurrent waiter resolves with identical terminal output"
);
}
}
#[tokio::test]
async fn driver_reattach_after_terminal_short_circuits_without_engine_call() {
use std::sync::atomic::Ordering;
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle)).with_attach(
Arc::new(CountingAttach {
calls: Arc::clone(&calls),
}),
);
let registry = driver.process_registry();
registry
.register_process(registration("proc"))
.await
.expect("register");
let output = success(serde_json::json!("reattached"));
registry
.complete_process("proc", output.clone())
.await
.expect("complete");
let start = std::time::Instant::now();
let resolved = driver.await_terminal("proc").await.expect("await terminal");
assert_eq!(resolved, output);
assert_eq!(
calls.load(Ordering::SeqCst),
0,
"a terminal short-circuit must never call the engine attach"
);
assert!(
start.elapsed() < Duration::from_millis(500),
"a short-circuit resolves without any backoff wait"
);
}
#[derive(Clone, Default)]
struct LossySink {
seen: Arc<Mutex<Vec<u64>>>,
dropped: Arc<Mutex<Vec<u64>>>,
}
#[async_trait::async_trait]
impl ProcessEventSink for LossySink {
async fn emit(&self, event: &ProcessEvent) {
if event.sequence.is_multiple_of(2) {
self.dropped.lock().expect("sink lock").push(event.sequence);
} else {
self.seen.lock().expect("sink lock").push(event.sequence);
}
}
}
#[tokio::test]
async fn lossy_sink_still_reconciles_complete_log_from_events_after() {
let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
let sink = LossySink::default();
let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
registry
.register_process(registration_with_events("proc", &["producer.step"]))
.await
.expect("register");
const EVENTS: u64 = 6;
for _ in 0..EVENTS {
registry
.append_event(
"proc",
ProcessEventAppendRequest::new("producer.step", serde_json::json!({})),
)
.await
.expect("append");
}
registry
.complete_process("proc", success(serde_json::json!("done")))
.await
.expect("complete");
assert!(
!sink.dropped.lock().expect("sink lock").is_empty(),
"the lossy sink must drop at least one emit for the scenario to be meaningful"
);
assert!(
(sink.seen.lock().expect("sink lock").len() as u64) < EVENTS,
"the sink observed fewer events than were appended"
);
let reconciled = registry
.events_after("proc", 0)
.await
.expect("events")
.into_iter()
.filter(|event| event.event_type == "producer.step")
.map(|event| event.sequence)
.collect::<Vec<_>>();
assert_eq!(
reconciled,
(1..=EVENTS).collect::<Vec<_>>(),
"events_after reconciles the complete non-terminal log despite push loss"
);
}
}