#![cfg(any(test, feature = "testing"))]
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::plugin::PluginError;
use super::events::{
ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
ProcessTerminalState,
};
use super::model::{
AbandonRequest, PROCESS_LEASE_SCHEMA_VERSION, ProcessExternalRef, ProcessHandleDescriptor,
ProcessHandleGrant, ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome,
ProcessLeaseCompletion, ProcessListFilter, ProcessRecord, ProcessRegistration,
ProcessSessionDeleteReport, ProcessStarted, SessionScope, SessionScopeId, WaitState,
};
use super::registry::{ProcessPruneReport, ProcessRegistry};
use super::time::current_epoch_ms;
use super::validation::{
ensure_core_event_types, prepare_process_event_append, process_registration_hash,
validate_process_registration,
};
pub struct TestLocalProcessRegistry {
durability_tier: crate::DurabilityTier,
managed: Arc<Mutex<ManagedProcessMap>>,
grants: Arc<Mutex<ManagedGrantMap>>,
leases: Arc<Mutex<ManagedLeaseMap>>,
}
impl Default for TestLocalProcessRegistry {
fn default() -> Self {
Self {
durability_tier: crate::DurabilityTier::Inline,
managed: Arc::new(Mutex::new(HashMap::new())),
grants: Arc::new(Mutex::new(HashMap::new())),
leases: Arc::new(Mutex::new(HashMap::new())),
}
}
}
type ManagedProcessMap = HashMap<String, ManagedProcessRecord>;
type ManagedGrantMap = HashMap<SessionScopeId, HashMap<String, ProcessHandleGrant>>;
type ManagedLeaseMap = HashMap<String, ProcessLease>;
struct ManagedProcessRecord {
record: ProcessRecord,
events: Vec<ProcessEvent>,
keyed_events: HashMap<String, (String, ProcessEvent)>,
acked_wakes: HashSet<u64>,
}
impl TestLocalProcessRegistry {
pub fn with_durability_tier(mut self, durability_tier: crate::DurabilityTier) -> Self {
self.durability_tier = durability_tier;
self
}
pub fn durable() -> Self {
Self::default().with_durability_tier(crate::DurabilityTier::Durable)
}
async fn insert_process(
&self,
mut registration: ProcessRegistration,
) -> Result<ProcessRecord, PluginError> {
ensure_core_event_types(&mut registration);
validate_process_registration(®istration)?;
let registration_hash = process_registration_hash(®istration)?;
let mut managed = self.managed.lock().await;
if let Some(existing) = managed.get(®istration.id) {
if existing.record.registration_hash == registration_hash {
return Ok(existing.record.clone());
}
return Err(PluginError::Session(format!(
"process `{}` registration hash conflict: existing {}, new {}",
registration.id, existing.record.registration_hash, registration_hash
)));
}
let id = registration.id.clone();
let record = ProcessRecord::from_prepared_registration(
registration,
registration_hash,
current_epoch_ms(),
);
managed.insert(
id.clone(),
ManagedProcessRecord {
record: record.clone(),
events: Vec::new(),
keyed_events: HashMap::new(),
acked_wakes: HashSet::new(),
},
);
Ok(record)
}
}
#[async_trait::async_trait]
impl ProcessRegistry for TestLocalProcessRegistry {
fn durability_tier(&self) -> crate::DurabilityTier {
self.durability_tier
}
async fn register_process(
&self,
registration: ProcessRegistration,
) -> Result<ProcessRecord, PluginError> {
self.insert_process(registration).await
}
async fn set_external_ref(
&self,
process_id: &str,
external_ref: ProcessExternalRef,
) -> Result<ProcessRecord, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
if let Some(existing) = &record.record.external_ref {
if existing == &external_ref {
return Ok(record.record.clone());
}
return Err(process_external_ref_conflict(
process_id,
existing,
&external_ref,
));
}
record.record.external_ref = Some(external_ref);
record.record.updated_at_ms = current_epoch_ms();
Ok(record.record.clone())
}
async fn grant_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
descriptor: ProcessHandleDescriptor,
) -> Result<ProcessHandleGrant, PluginError> {
if self.get_process(process_id).await.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let grant = ProcessHandleGrant {
session_id: session_scope.session_id.clone(),
process_id: process_id.to_string(),
descriptor,
};
self.grants
.lock()
.await
.entry(session_scope.id())
.or_default()
.insert(process_id.to_string(), grant.clone());
Ok(grant)
}
async fn revoke_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<(), PluginError> {
if let Some(session_grants) = self.grants.lock().await.get_mut(&session_scope.id()) {
session_grants.remove(process_id);
}
Ok(())
}
async fn transfer_handle_grants(
&self,
from_scope: &SessionScope,
to_scope: &SessionScope,
process_ids: &[String],
) -> Result<(), PluginError> {
let mut grants = self.grants.lock().await;
let from_scope_id = from_scope.id();
let to_scope_id = to_scope.id();
for process_id in process_ids {
let grant = grants
.get_mut(&from_scope_id)
.and_then(|session_grants| session_grants.remove(process_id))
.ok_or_else(|| {
PluginError::Session(format!(
"process handle `{process_id}` is not granted to session `{}`",
from_scope.session_id
))
})?;
grants.entry(to_scope_id.clone()).or_default().insert(
process_id.clone(),
ProcessHandleGrant {
session_id: to_scope.session_id.clone(),
process_id: process_id.clone(),
descriptor: grant.descriptor,
},
);
}
Ok(())
}
async fn list_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
let grants = self
.grants
.lock()
.await
.get(&session_scope.id())
.cloned()
.unwrap_or_default();
let managed = self.managed.lock().await;
let mut entries = grants
.into_values()
.filter_map(|grant| {
managed
.get(&grant.process_id)
.map(|record| (grant, record.record.clone()))
})
.collect::<Vec<_>>();
entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
Ok(entries)
}
async fn list_live_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
let grants = self
.grants
.lock()
.await
.get(&session_scope.id())
.cloned()
.unwrap_or_default();
let managed = self.managed.lock().await;
let mut entries = grants
.into_values()
.filter_map(|grant| {
managed
.get(&grant.process_id)
.filter(|record| !record.record.is_terminal())
.map(|record| (grant, record.record.clone()))
})
.collect::<Vec<_>>();
entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
Ok(entries)
}
async fn has_handle_grant(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<bool, PluginError> {
let session_scope_id = session_scope.id();
let granted = self
.grants
.lock()
.await
.get(&session_scope_id)
.is_some_and(|session_grants| session_grants.contains_key(process_id));
if !granted {
return Ok(false);
}
Ok(self.managed.lock().await.contains_key(process_id))
}
async fn handle_grants_for_process(
&self,
process_id: &str,
) -> Result<Vec<ProcessHandleGrant>, PluginError> {
if self.get_process(process_id).await.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let grants = self.grants.lock().await;
let mut entries = grants
.values()
.filter_map(|session_grants| session_grants.get(process_id).cloned())
.collect::<Vec<_>>();
entries.sort_by(|left, right| left.session_id.cmp(&right.session_id));
Ok(entries)
}
async fn delete_session_process_state(
&self,
session_id: &str,
) -> Result<ProcessSessionDeleteReport, PluginError> {
let removed = {
let mut grants = self.grants.lock().await;
let mut removed = Vec::new();
grants.retain(|_, session_grants| {
if session_grants
.values()
.next()
.is_some_and(|grant| grant.session_id == session_id)
{
removed.extend(session_grants.drain().map(|(_, grant)| grant));
false
} else {
true
}
});
removed
};
let mut managed = self.managed.lock().await;
let grants = self.grants.lock().await;
let mut orphaned_process_ids = Vec::new();
let mut preserved_process_ids = Vec::new();
for grant in &removed {
let Some(record) = managed.get(&grant.process_id) else {
continue;
};
if record.record.is_terminal() {
continue;
}
let still_granted = grants
.values()
.any(|session_grants| session_grants.contains_key(&grant.process_id));
if still_granted {
preserved_process_ids.push(grant.process_id.clone());
} else {
orphaned_process_ids.push(grant.process_id.clone());
}
}
for record in managed.values_mut() {
let _ = record.record.clear_wake_target_for_session(session_id);
}
orphaned_process_ids.sort();
orphaned_process_ids.dedup();
preserved_process_ids.sort();
preserved_process_ids.dedup();
Ok(ProcessSessionDeleteReport {
session_id: session_id.to_string(),
revoked_handle_count: removed.len(),
deleted_wake_count: 0,
orphaned_process_ids,
preserved_process_ids,
})
}
async fn append_event(
&self,
process_id: &str,
request: ProcessEventAppendRequest,
) -> Result<ProcessEventAppendResult, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
let replay_lookup = request
.replay
.as_ref()
.and_then(|replay| record.keyed_events.get(replay.key.as_str()))
.map(|(hash, event)| (hash.clone(), event.clone()));
let sequence = record.events.len() as u64 + 1;
let prepared = prepare_process_event_append(
&record.record,
request,
sequence,
replay_lookup,
current_epoch_ms(),
)?;
match prepared {
super::ProcessEventAppendPlan::Replay {
event,
repair_status,
wake_delivery,
occurred_at_ms,
} => {
if let Some(status) = repair_status {
super::apply_process_status_projection(
&mut record.record,
status,
occurred_at_ms,
);
}
Ok(ProcessEventAppendResult {
event,
wake_delivery,
})
}
super::ProcessEventAppendPlan::Insert {
event,
payload_hash,
status_update,
wake_delivery,
occurred_at_ms,
} => {
if let Some(status) = status_update {
super::apply_process_status_projection(
&mut record.record,
status,
occurred_at_ms,
);
} else {
record.record.updated_at_ms = occurred_at_ms;
}
record.events.push(event.clone());
if let Some(replay) = event.invocation.replay.clone() {
record
.keyed_events
.insert(replay.key, (payload_hash, event.clone()));
}
Ok(ProcessEventAppendResult {
event,
wake_delivery,
})
}
}
}
async fn events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
let managed = self.managed.lock().await;
let Some(record) = managed.get(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
Ok(record
.events
.iter()
.filter(|event| event.sequence > after_sequence)
.cloned()
.collect())
}
async fn wake_events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
let managed = self.managed.lock().await;
let Some(record) = managed.get(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
Ok(record
.events
.iter()
.filter(|event| event.sequence > after_sequence)
.filter(|event| event.semantics.wake.is_some())
.filter(|event| !record.acked_wakes.contains(&event.sequence))
.cloned()
.collect())
}
async fn complete_process(
&self,
process_id: &str,
await_output: ProcessAwaitOutput,
) -> Result<ProcessRecord, PluginError> {
let event_type = match await_output.terminal_state() {
ProcessTerminalState::Completed => "process.completed",
ProcessTerminalState::Failed => "process.failed",
ProcessTerminalState::Cancelled => "process.cancelled",
ProcessTerminalState::Abandoned => "process.abandoned",
};
self.append_event(
process_id,
ProcessEventAppendRequest::new(
event_type,
serde_json::json!({ "await_output": await_output }),
)
.with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
)
.await?;
self.get_process(process_id).await.ok_or_else(|| {
PluginError::Session(format!(
"unknown process `{process_id}` after terminal event"
))
})
}
async fn record_first_started(
&self,
process_id: &str,
started: ProcessStarted,
) -> Result<ProcessRecord, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
if record.record.first_started.is_none() {
record.record.first_started = Some(Box::new(started));
record.record.updated_at_ms = current_epoch_ms();
}
Ok(record.record.clone())
}
async fn request_process_abandon(
&self,
process_id: &str,
request: AbandonRequest,
) -> Result<ProcessRecord, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
if record.record.is_terminal() {
return Err(PluginError::Session(format!(
"terminal process `{process_id}` cannot accept an abandon request"
)));
}
if record.record.abandon_request.is_none() {
record.record.abandon_request = Some(Box::new(request));
record.record.updated_at_ms = current_epoch_ms();
}
Ok(record.record.clone())
}
async fn set_process_wait(
&self,
process_id: &str,
wait: WaitState,
) -> Result<ProcessRecord, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
if record.record.is_terminal() {
return Err(PluginError::Session(format!(
"terminal process `{process_id}` cannot enter a wait state"
)));
}
record.record.wait = Some(wait);
record.record.updated_at_ms = current_epoch_ms();
Ok(record.record.clone())
}
async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
record.record.wait = None;
record.record.updated_at_ms = current_epoch_ms();
Ok(record.record.clone())
}
async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
let managed = self.managed.lock().await;
managed.get(process_id).map(|record| record.record.clone())
}
async fn list_processes(
&self,
filter: &ProcessListFilter,
) -> Result<Vec<ProcessRecord>, PluginError> {
let managed = self.managed.lock().await;
let mut records = managed
.values()
.map(|record| record.record.clone())
.filter(|record| filter.matches_record(record))
.collect::<Vec<_>>();
records.sort_by(|a, b| a.id.cmp(&b.id));
Ok(records)
}
async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
let mut managed = self.managed.lock().await;
let Some(record) = managed.get_mut(process_id) else {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
};
record.acked_wakes.insert(sequence);
Ok(())
}
async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
let managed = self.managed.lock().await;
let mut records: Vec<ProcessRecord> = managed
.values()
.filter(|record| !record.record.is_terminal())
.map(|record| record.record.clone())
.collect();
records.sort_by(|a, b| a.id.cmp(&b.id));
Ok(records)
}
async fn claim_process_lease(
&self,
process_id: &str,
owner: &crate::LeaseOwnerIdentity,
lease_ttl_ms: u64,
) -> Result<ProcessLeaseClaimOutcome, PluginError> {
let mut leases = self.leases.lock().await;
let now = current_epoch_ms();
if let Some(current) = leases.get_mut(process_id)
&& !current.lease_token.is_empty()
&& current.expires_at_epoch_ms > now
{
if current.owner.same_incarnation(owner) {
current.expires_at_epoch_ms = now.saturating_add(lease_ttl_ms);
return Ok(ProcessLeaseClaimOutcome::Acquired(current.clone()));
}
return Ok(ProcessLeaseClaimOutcome::Busy {
holder: current.clone(),
});
}
let fencing_token = leases
.get(process_id)
.map_or(0, |current| current.fencing_token)
.saturating_add(1);
let lease = acquire_test_lease(process_id, owner, fencing_token, now, lease_ttl_ms);
leases.insert(process_id.to_string(), lease.clone());
Ok(ProcessLeaseClaimOutcome::Acquired(lease))
}
async fn reclaim_process_lease(
&self,
process_id: &str,
owner: &crate::LeaseOwnerIdentity,
observed_holder: &ProcessLease,
lease_ttl_ms: u64,
) -> Result<ProcessLeaseClaimOutcome, PluginError> {
let mut leases = self.leases.lock().await;
let now = current_epoch_ms();
let Some(current) = leases.get(process_id) else {
let lease = acquire_test_lease(process_id, owner, 1, now, lease_ttl_ms);
leases.insert(process_id.to_string(), lease.clone());
return Ok(ProcessLeaseClaimOutcome::Acquired(lease));
};
if current.lease_token.is_empty() || current.expires_at_epoch_ms <= now {
let lease = acquire_test_lease(
process_id,
owner,
current.fencing_token.saturating_add(1),
now,
lease_ttl_ms,
);
leases.insert(process_id.to_string(), lease.clone());
return Ok(ProcessLeaseClaimOutcome::Acquired(lease));
}
if observed_holder.process_id == process_id
&& current.owner.same_incarnation(&observed_holder.owner)
&& current.lease_token == observed_holder.lease_token
&& current.fencing_token == observed_holder.fencing_token
&& current.owner.is_definitely_dead_for_claimant(owner)
{
let lease = acquire_test_lease(
process_id,
owner,
current.fencing_token.saturating_add(1),
now,
lease_ttl_ms,
);
leases.insert(process_id.to_string(), lease.clone());
return Ok(ProcessLeaseClaimOutcome::Acquired(lease));
}
Ok(ProcessLeaseClaimOutcome::Busy {
holder: current.clone(),
})
}
async fn renew_process_lease(
&self,
lease: &ProcessLease,
lease_ttl_ms: u64,
) -> Result<ProcessLease, PluginError> {
let mut leases = self.leases.lock().await;
let now = current_epoch_ms();
let live = leases.get(&lease.process_id).filter(|current| {
!current.lease_token.is_empty()
&& current.owner.same_incarnation(&lease.owner)
&& current.lease_token == lease.lease_token
&& current.fencing_token == lease.fencing_token
&& current.expires_at_epoch_ms > now
});
if live.is_none() {
return Err(process_lease_expired(&lease.process_id));
}
let renewed = ProcessLease {
expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
..lease.clone()
};
leases.insert(lease.process_id.clone(), renewed.clone());
Ok(renewed)
}
async fn get_process_lease(
&self,
process_id: &str,
) -> Result<Option<ProcessLease>, PluginError> {
Ok(self
.leases
.lock()
.await
.get(process_id)
.filter(|lease| !lease.lease_token.is_empty())
.cloned())
}
async fn complete_process_lease(
&self,
completion: &ProcessLeaseCompletion,
) -> Result<(), PluginError> {
let mut leases = self.leases.lock().await;
if let Some(current) = leases.get_mut(&completion.process_id)
&& current.lease_token == completion.lease_token
{
current.owner = crate::LeaseOwnerIdentity::opaque("", "");
current.lease_token = String::new();
current.claimed_at_epoch_ms = 0;
current.expires_at_epoch_ms = 0;
}
Ok(())
}
async fn prune_terminal_processes(
&self,
cutoff_epoch_ms: u64,
) -> Result<ProcessPruneReport, PluginError> {
let mut pruned_events = 0;
let prunable: HashSet<String> = {
let mut managed = self.managed.lock().await;
let prunable: Vec<String> = managed
.iter()
.filter(|(_, record)| {
record.record.is_terminal() && record.record.updated_at_ms < cutoff_epoch_ms
})
.map(|(id, _)| id.clone())
.collect();
for id in &prunable {
if let Some(record) = managed.remove(id) {
pruned_events += record.events.len();
}
}
prunable.into_iter().collect()
};
{
let mut grants = self.grants.lock().await;
for session_grants in grants.values_mut() {
session_grants.retain(|process_id, _| !prunable.contains(process_id));
}
grants.retain(|_, session_grants| !session_grants.is_empty());
}
self.leases
.lock()
.await
.retain(|process_id, _| !prunable.contains(process_id));
Ok(ProcessPruneReport {
pruned_processes: prunable.len(),
pruned_events,
})
}
}
fn acquire_test_lease(
process_id: &str,
owner: &crate::LeaseOwnerIdentity,
fencing_token: u64,
now: u64,
lease_ttl_ms: u64,
) -> ProcessLease {
ProcessLease {
schema_version: PROCESS_LEASE_SCHEMA_VERSION,
process_id: process_id.to_string(),
owner: owner.clone(),
lease_token: format!(
"{process_id}:{}:{}:{now}:{fencing_token}",
owner.owner_id, owner.incarnation_id
),
fencing_token,
claimed_at_epoch_ms: now,
expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
}
}
fn process_lease_expired(process_id: &str) -> PluginError {
PluginError::Session(format!(
"process lease for `{process_id}` is missing or expired"
))
}
fn process_external_ref_conflict(
process_id: &str,
existing: &ProcessExternalRef,
new: &ProcessExternalRef,
) -> PluginError {
PluginError::Session(format!(
"process `{process_id}` external ref conflict: existing {existing:?}, new {new:?}"
))
}