Skip to main content

lash_core/runtime/process/
model.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10    default_process_event_types,
11};
12use super::validation::{
13    ensure_core_event_types, process_registration_hash, validate_process_registration,
14};
15
16pub type ProcessId = String;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
19#[serde(transparent)]
20pub struct SessionScopeId(String);
21
22impl SessionScopeId {
23    pub fn new(value: impl Into<String>) -> Self {
24        Self(value.into())
25    }
26
27    pub fn as_str(&self) -> &str {
28        &self.0
29    }
30}
31
32impl fmt::Display for SessionScopeId {
33    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34        formatter.write_str(&self.0)
35    }
36}
37
38impl From<String> for SessionScopeId {
39    fn from(value: String) -> Self {
40        Self::new(value)
41    }
42}
43
44impl From<&str> for SessionScopeId {
45    fn from(value: &str) -> Self {
46        Self::new(value)
47    }
48}
49
50/// Durable executable input for a process.
51///
52/// `ToolCall`, `SessionTurn`, and `External` are kernel process primitives:
53/// core owns their durable representation and execution semantics because they
54/// are how the runtime coordinates tools, child sessions, and externally
55/// completed work. `Engine` is the extension point for deployment-specific
56/// process runtimes; those rows require a matching [`crate::ProcessEngine`] in
57/// the host's process engine registry.
58#[derive(Debug, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum ProcessInput {
61    ToolCall {
62        call: crate::PreparedToolCall,
63    },
64    Engine {
65        kind: String,
66        #[serde(default)]
67        payload: serde_json::Value,
68    },
69    SessionTurn {
70        create_request: Box<crate::SessionCreateRequest>,
71        turn_input: Box<crate::TurnInput>,
72        output_contract: crate::ToolOutputContract,
73    },
74    External {
75        #[serde(default)]
76        metadata: serde_json::Value,
77    },
78}
79
80impl Clone for ProcessInput {
81    fn clone(&self) -> Self {
82        match self {
83            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
84            Self::Engine { kind, payload } => Self::Engine {
85                kind: kind.clone(),
86                payload: payload.clone(),
87            },
88            Self::SessionTurn {
89                create_request,
90                turn_input,
91                output_contract,
92            } => Self::SessionTurn {
93                create_request: create_request.clone(),
94                turn_input: turn_input.clone(),
95                output_contract: output_contract.clone(),
96            },
97            Self::External { metadata } => Self::External {
98                metadata: metadata.clone(),
99            },
100        }
101    }
102}
103
104impl PartialEq for ProcessInput {
105    fn eq(&self, other: &Self) -> bool {
106        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
107    }
108}
109
110impl ProcessInput {
111    pub fn engine_kind(&self) -> &'static str {
112        match self {
113            Self::ToolCall { .. } => "tool",
114            Self::Engine { .. } => "engine",
115            Self::SessionTurn { .. } => "session_turn",
116            Self::External { .. } => "external",
117        }
118    }
119
120    pub fn engine_specific_kind(&self) -> Option<&str> {
121        match self {
122            Self::Engine { kind, .. } => Some(kind.as_str()),
123            _ => None,
124        }
125    }
126}
127
128/// Producer-declared contract stating what recovery may do with a process row
129/// after owner loss. Required at registration and applied mechanically by the
130/// sweep; never inferred at runtime. See ADR 0019.
131///
132/// There is deliberately no `Default` and no serde default: a producer that
133/// forgets to declare a disposition must fail to compile rather than silently
134/// inherit re-execution.
135#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub enum RecoveryDisposition {
138    /// Another owner may re-execute the work — the contract for journaled,
139    /// idempotent inputs (engine rows, session-turn rows).
140    Rerunnable,
141    /// The contract binds at first start: before any owner has begun execution
142    /// any worker may claim the row; once execution has started, no other owner
143    /// may ever re-execute it — abandonment is the only recovery.
144    OwnerBound,
145    /// Lash never executes the row at all. Closure comes from an external actor
146    /// calling `complete_process`, or from a reconciled Abandon Request.
147    ExternallyOwned,
148}
149
150/// Durable "execution started" fact: which owner began executing the row and
151/// when. The runner writes it under its live lease immediately before executing
152/// so the sweep can distinguish an OwnerBound row that has started (never
153/// re-run) from one that has not (runnable by anyone). First-writer-wins.
154#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
155pub struct ProcessStarted {
156    pub owner: crate::LeaseOwnerIdentity,
157    pub started_at_ms: u64,
158}
159
160/// Durable, non-terminal marker recording that a non-owner authorized
161/// abandonment without proof the owner is gone. The sweep reconciles it into
162/// [`ProcessTerminalState::Abandoned`](super::events::ProcessTerminalState::Abandoned)
163/// only once the row's lease has lapsed; the marker never terminates anything
164/// by itself and is visible to observers while pending. See ADR 0019.
165#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
166pub struct AbandonRequest {
167    pub requested_by: String,
168    pub requested_at_ms: u64,
169    #[serde(default, skip_serializing_if = "Option::is_none")]
170    pub reason: Option<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(transparent)]
175pub struct ProcessExecutionEnvRef(String);
176
177impl ProcessExecutionEnvRef {
178    pub fn new(value: impl Into<String>) -> Self {
179        Self(value.into())
180    }
181
182    pub fn as_str(&self) -> &str {
183        &self.0
184    }
185}
186
187impl fmt::Display for ProcessExecutionEnvRef {
188    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
189        formatter.write_str(&self.0)
190    }
191}
192
193#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
194#[serde(deny_unknown_fields)]
195pub struct ProcessExecutionEnvSpec {
196    #[serde(default)]
197    pub plugin_options: crate::PluginOptions,
198    #[serde(default)]
199    pub policy: crate::SessionPolicy,
200}
201
202impl ProcessExecutionEnvSpec {
203    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
204        Self {
205            plugin_options,
206            policy,
207        }
208    }
209
210    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
211        crate::stable_hash::stable_json_sha256_hex(self)
212            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
213    }
214
215    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
216        serde_json::to_vec(self)
217    }
218
219    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
220        serde_json::from_slice(bytes)
221    }
222}
223
224#[async_trait::async_trait]
225pub trait ProcessExecutionEnvStore: Send + Sync {
226    fn durability_tier(&self) -> crate::DurabilityTier {
227        crate::DurabilityTier::Inline
228    }
229
230    async fn put_process_execution_env(
231        &self,
232        env_ref: &ProcessExecutionEnvRef,
233        bytes: &[u8],
234    ) -> Result<(), crate::PluginError>;
235
236    async fn get_process_execution_env(
237        &self,
238        env_ref: &ProcessExecutionEnvRef,
239    ) -> Result<Option<Vec<u8>>, crate::PluginError>;
240}
241
242#[derive(Default)]
243pub struct InMemoryProcessExecutionEnvStore {
244    envs: Mutex<BTreeMap<String, Vec<u8>>>,
245}
246
247impl InMemoryProcessExecutionEnvStore {
248    pub fn new() -> Self {
249        Self::default()
250    }
251}
252
253#[async_trait::async_trait]
254impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
255    async fn put_process_execution_env(
256        &self,
257        env_ref: &ProcessExecutionEnvRef,
258        bytes: &[u8],
259    ) -> Result<(), crate::PluginError> {
260        self.envs
261            .lock()
262            .map_err(|_| {
263                crate::PluginError::Session("process execution env store lock poisoned".to_string())
264            })?
265            .insert(env_ref.as_str().to_string(), bytes.to_vec());
266        Ok(())
267    }
268
269    async fn get_process_execution_env(
270        &self,
271        env_ref: &ProcessExecutionEnvRef,
272    ) -> Result<Option<Vec<u8>>, crate::PluginError> {
273        Ok(self
274            .envs
275            .lock()
276            .map_err(|_| {
277                crate::PluginError::Session("process execution env store lock poisoned".to_string())
278            })?
279            .get(env_ref.as_str())
280            .cloned())
281    }
282}
283
284pub async fn persist_process_execution_env(
285    env_store: &dyn ProcessExecutionEnvStore,
286    spec: &ProcessExecutionEnvSpec,
287) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
288    let env_ref = spec.stable_ref().map_err(|err| {
289        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
290    })?;
291    let bytes = spec.to_store_bytes().map_err(|err| {
292        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
293    })?;
294    env_store
295        .put_process_execution_env(&env_ref, &bytes)
296        .await?;
297    Ok(env_ref)
298}
299
300pub async fn load_process_execution_env(
301    env_store: &dyn ProcessExecutionEnvStore,
302    env_ref: &ProcessExecutionEnvRef,
303) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
304    let bytes = env_store
305        .get_process_execution_env(env_ref)
306        .await?
307        .ok_or_else(|| {
308            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
309        })?;
310    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
311        crate::PluginError::Session(format!(
312            "failed to decode process execution env `{env_ref}`: {err}"
313        ))
314    })
315}
316
317/// Execution-local context for process runners. Durable edges live on the
318/// process record.
319#[derive(Clone, Debug, Default, Serialize, Deserialize)]
320pub struct ProcessExecutionContext {
321    #[serde(default, skip_serializing_if = "Option::is_none")]
322    pub causal_invocation: Option<crate::RuntimeInvocation>,
323}
324
325impl ProcessExecutionContext {
326    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
327        self.causal_invocation = invocation;
328        self
329    }
330
331    pub fn is_empty(&self) -> bool {
332        self.causal_invocation.is_none()
333    }
334}
335
336#[derive(Clone)]
337pub struct ProcessOpScope<'scope> {
338    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
339    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
340    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
341    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
342}
343
344impl<'scope> ProcessOpScope<'scope> {
345    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
346        Self {
347            parent_invocation: None,
348            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
349                scoped_effect_controller,
350            ),
351            agent_frame_id: None,
352            target_agent_frame_id: None,
353        }
354    }
355
356    pub fn with_parent_invocation(
357        mut self,
358        parent_invocation: Option<crate::RuntimeInvocation>,
359    ) -> Self {
360        self.parent_invocation = parent_invocation;
361        self
362    }
363
364    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
365        self.agent_frame_id = agent_frame_id;
366        self
367    }
368
369    pub fn with_target_agent_frame_id(
370        mut self,
371        agent_frame_id: Option<crate::AgentFrameId>,
372    ) -> Self {
373        self.target_agent_frame_id = agent_frame_id;
374        self
375    }
376
377    pub fn agent_frame_id(&self) -> Option<&str> {
378        self.agent_frame_id.as_deref()
379    }
380
381    pub fn target_agent_frame_id(&self) -> Option<&str> {
382        self.target_agent_frame_id.as_deref()
383    }
384
385    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
386        self.effect_controller.controller()
387    }
388}
389
390#[derive(Clone, Debug, Default)]
391pub struct ProcessStartOptions {
392    pub descriptor: Option<ProcessHandleDescriptor>,
393    /// Runtime-internal spawn provenance override. Set by process execution
394    /// contexts so children started *by a process* inherit the parent's
395    /// originator and wake target instead of being stamped with the ephemeral
396    /// execution scope. `None` means the session start path stamps the
397    /// creating session (the in-session meaning of "start"). This rides
398    /// options — not the request — so in-session callers cannot forge
399    /// provenance through the session surface.
400    pub spawn_provenance: Option<ProcessSpawnProvenance>,
401}
402
403/// Provenance a process-run context hands to its children: the chain's
404/// originator and the chain's wake target. Mirrors the trigger fire path,
405/// where the spawned process inherits the registrant and the grant is derived
406/// from the wake target.
407#[derive(Clone, Debug, PartialEq, Eq)]
408pub struct ProcessSpawnProvenance {
409    pub originator: ProcessOriginator,
410    pub wake_target: Option<SessionScope>,
411}
412
413impl ProcessStartOptions {
414    pub fn new() -> Self {
415        Self::default()
416    }
417
418    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
419        self.descriptor = Some(descriptor);
420        self
421    }
422
423    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
424        self.descriptor = descriptor;
425        self
426    }
427
428    pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
429        self.spawn_provenance = Some(spawn_provenance);
430        self
431    }
432
433    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
434        ProcessExecutionContext {
435            causal_invocation: scope.parent_invocation.clone(),
436        }
437    }
438}
439
440/// Public host-facing request for starting a visible process handle.
441#[derive(Clone, Debug, Serialize, Deserialize)]
442pub struct ProcessStartRequest {
443    pub id: ProcessId,
444    pub input: ProcessInput,
445    pub disposition: RecoveryDisposition,
446    #[serde(default, skip_serializing_if = "Option::is_none")]
447    pub env_spec: Option<ProcessExecutionEnvSpec>,
448    pub originator: ProcessOriginator,
449    #[serde(default, skip_serializing_if = "Option::is_none")]
450    pub wake_target: Option<SessionScope>,
451    #[serde(default, skip_serializing_if = "Option::is_none")]
452    pub grant: Option<ProcessStartGrant>,
453    #[serde(default)]
454    pub event_types: Vec<ProcessEventType>,
455}
456
457impl ProcessStartRequest {
458    pub fn new(
459        id: impl Into<ProcessId>,
460        input: ProcessInput,
461        disposition: RecoveryDisposition,
462        originator: ProcessOriginator,
463    ) -> Self {
464        Self {
465            id: id.into(),
466            input,
467            disposition,
468            env_spec: None,
469            originator,
470            wake_target: None,
471            grant: None,
472            event_types: default_process_event_types(),
473        }
474    }
475
476    /// External placeholder start: `ProcessInput::External` is always
477    /// [`RecoveryDisposition::ExternallyOwned`] — lash never executes it.
478    pub fn external(
479        id: impl Into<ProcessId>,
480        originator: ProcessOriginator,
481        metadata: serde_json::Value,
482    ) -> Self {
483        Self::new(
484            id,
485            ProcessInput::External { metadata },
486            RecoveryDisposition::ExternallyOwned,
487            originator,
488        )
489    }
490
491    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
492        self.env_spec = Some(env_spec);
493        self
494    }
495
496    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
497        self.wake_target = wake_target;
498        self
499    }
500
501    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
502        self.grant = grant;
503        self
504    }
505
506    pub fn with_event_types(
507        mut self,
508        event_types: impl IntoIterator<Item = ProcessEventType>,
509    ) -> Self {
510        self.event_types = event_types.into_iter().collect();
511        self
512    }
513
514    pub fn with_extra_event_types(
515        mut self,
516        event_types: impl IntoIterator<Item = ProcessEventType>,
517    ) -> Self {
518        self.event_types.extend(event_types);
519        self
520    }
521
522    pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
523        ProcessRegistration::new(
524            self.id,
525            self.input,
526            self.disposition,
527            ProcessProvenance::new(self.originator),
528        )
529        .with_event_types(self.event_types)
530        .with_execution_env_ref(env_ref)
531        .with_wake_target(self.wake_target)
532    }
533}
534
535#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
536pub struct SessionScope {
537    pub session_id: String,
538    #[serde(default, skip_serializing_if = "Option::is_none")]
539    pub agent_frame_id: Option<crate::AgentFrameId>,
540}
541
542#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
543pub struct ProcessProvenance {
544    pub originator: ProcessOriginator,
545    #[serde(default, skip_serializing_if = "Option::is_none")]
546    pub caused_by: Option<crate::CausalRef>,
547}
548
549impl ProcessProvenance {
550    pub fn new(originator: ProcessOriginator) -> Self {
551        Self {
552            originator,
553            caused_by: None,
554        }
555    }
556
557    pub fn host() -> Self {
558        Self::new(ProcessOriginator::host())
559    }
560
561    pub fn session(scope: SessionScope) -> Self {
562        Self::new(ProcessOriginator::session(scope))
563    }
564
565    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
566        self.caused_by = caused_by;
567        self
568    }
569}
570
571#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
572#[serde(tag = "type", rename_all = "snake_case")]
573pub enum ProcessOriginator {
574    Host,
575    Session { scope: SessionScope },
576}
577
578impl ProcessOriginator {
579    pub fn host() -> Self {
580        Self::Host
581    }
582
583    pub fn session(scope: SessionScope) -> Self {
584        Self::Session { scope }
585    }
586
587    pub fn scope_id(&self) -> String {
588        match self {
589            Self::Host => "host".to_string(),
590            Self::Session { scope } => scope.id().to_string(),
591        }
592    }
593}
594
595impl SessionScope {
596    pub fn new(session_id: impl Into<String>) -> Self {
597        Self {
598            session_id: session_id.into(),
599            agent_frame_id: None,
600        }
601    }
602
603    pub fn for_agent_frame(
604        session_id: impl Into<String>,
605        agent_frame_id: impl Into<crate::AgentFrameId>,
606    ) -> Self {
607        Self {
608            session_id: session_id.into(),
609            agent_frame_id: Some(agent_frame_id.into()),
610        }
611    }
612
613    pub fn id(&self) -> SessionScopeId {
614        match self.agent_frame_id.as_deref() {
615            Some(frame_id) if !frame_id.is_empty() => {
616                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
617            }
618            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
619        }
620    }
621
622    pub fn is_empty(&self) -> bool {
623        self.session_id.is_empty()
624    }
625}
626
627/// Serializable process spec used to start or recover a runtime process.
628#[derive(Debug, Serialize, Deserialize)]
629pub struct ProcessRegistration {
630    pub id: ProcessId,
631    pub input: Arc<ProcessInput>,
632    pub disposition: RecoveryDisposition,
633    pub identity: ProcessIdentity,
634    #[serde(default)]
635    pub event_types: Vec<ProcessEventType>,
636    pub provenance: ProcessProvenance,
637    #[serde(default, skip_serializing_if = "Option::is_none")]
638    pub env_ref: Option<ProcessExecutionEnvRef>,
639    #[serde(default, skip_serializing_if = "Option::is_none")]
640    pub wake_target: Option<SessionScope>,
641}
642
643impl Clone for ProcessRegistration {
644    fn clone(&self) -> Self {
645        Self {
646            id: self.id.clone(),
647            input: Arc::clone(&self.input),
648            disposition: self.disposition,
649            identity: self.identity.clone(),
650            event_types: self.event_types.clone(),
651            provenance: self.provenance.clone(),
652            env_ref: self.env_ref.clone(),
653            wake_target: self.wake_target.clone(),
654        }
655    }
656}
657
658impl ProcessRegistration {
659    pub fn new(
660        id: impl Into<ProcessId>,
661        input: ProcessInput,
662        disposition: RecoveryDisposition,
663        provenance: ProcessProvenance,
664    ) -> Self {
665        let identity = ProcessIdentity::from_process_input(&input);
666        Self {
667            id: id.into(),
668            input: Arc::new(input),
669            disposition,
670            identity,
671            event_types: default_process_event_types(),
672            provenance,
673            env_ref: None,
674            wake_target: None,
675        }
676    }
677
678    pub(crate) fn session_start_draft(
679        id: impl Into<ProcessId>,
680        input: ProcessInput,
681        disposition: RecoveryDisposition,
682    ) -> Self {
683        Self::new(id, input, disposition, ProcessProvenance::host())
684    }
685
686    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
687        self.provenance = provenance;
688        self
689    }
690
691    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
692        self.env_ref = env_ref;
693        self
694    }
695
696    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
697        self.wake_target = wake_target;
698        self
699    }
700
701    pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
702        self.identity = identity;
703        self
704    }
705
706    pub fn with_event_types(
707        mut self,
708        event_types: impl IntoIterator<Item = ProcessEventType>,
709    ) -> Self {
710        self.event_types = event_types.into_iter().collect();
711        self
712    }
713
714    pub fn with_extra_event_types(
715        mut self,
716        event_types: impl IntoIterator<Item = ProcessEventType>,
717    ) -> Self {
718        self.event_types.extend(event_types);
719        self
720    }
721}
722
723#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
724#[serde(tag = "state", rename_all = "snake_case")]
725pub enum ProcessStatus {
726    #[default]
727    Running,
728    Completed {
729        await_output: ProcessAwaitOutput,
730    },
731    Failed {
732        await_output: ProcessAwaitOutput,
733    },
734    Cancelled {
735        await_output: ProcessAwaitOutput,
736    },
737    Abandoned {
738        await_output: ProcessAwaitOutput,
739    },
740}
741
742impl ProcessStatus {
743    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
744        match terminal.state {
745            ProcessTerminalState::Completed => Self::Completed {
746                await_output: terminal.await_output,
747            },
748            ProcessTerminalState::Failed => Self::Failed {
749                await_output: terminal.await_output,
750            },
751            ProcessTerminalState::Cancelled => Self::Cancelled {
752                await_output: terminal.await_output,
753            },
754            ProcessTerminalState::Abandoned => Self::Abandoned {
755                await_output: terminal.await_output,
756            },
757        }
758    }
759
760    pub fn is_terminal(&self) -> bool {
761        !matches!(self, Self::Running)
762    }
763
764    pub fn label(&self) -> &'static str {
765        match self {
766            Self::Running => "running",
767            Self::Completed { .. } => "completed",
768            Self::Failed { .. } => "failed",
769            Self::Cancelled { .. } => "cancelled",
770            Self::Abandoned { .. } => "abandoned",
771        }
772    }
773
774    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
775        match self {
776            Self::Running => None,
777            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
778            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
779            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
780            Self::Abandoned { .. } => Some(ProcessTerminalState::Abandoned),
781        }
782    }
783
784    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
785        match self {
786            Self::Running => None,
787            Self::Completed { await_output }
788            | Self::Failed { await_output }
789            | Self::Cancelled { await_output }
790            | Self::Abandoned { await_output } => Some(await_output),
791        }
792    }
793
794    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
795        Some(ProcessTerminalSemantics {
796            state: self.terminal_state()?,
797            await_output: self.await_output()?.clone(),
798        })
799    }
800}
801
802/// Durable process row. Session-visible addressability lives in
803/// [`ProcessHandleGrant`], not in the process record.
804#[derive(Clone, Debug, Serialize, Deserialize)]
805pub struct ProcessRecord {
806    pub id: ProcessId,
807    pub registration_hash: String,
808    pub input: Arc<ProcessInput>,
809    /// Declared recovery contract. Required with no serde default: pre-column
810    /// durable rows cannot deserialize and are handled by each store's schema
811    /// version bump (reject-and-recreate), never by an API/serde default.
812    pub disposition: RecoveryDisposition,
813    pub identity: ProcessIdentity,
814    #[serde(default)]
815    pub event_types: Vec<ProcessEventType>,
816    pub provenance: ProcessProvenance,
817    #[serde(default, skip_serializing_if = "Option::is_none")]
818    pub env_ref: Option<ProcessExecutionEnvRef>,
819    #[serde(default, skip_serializing_if = "Option::is_none")]
820    pub wake_target: Option<SessionScope>,
821    #[serde(default)]
822    pub created_at_ms: u64,
823    #[serde(default)]
824    pub updated_at_ms: u64,
825    #[serde(default, skip_serializing_if = "Option::is_none")]
826    pub external_ref: Option<ProcessExternalRef>,
827    /// Durable, lease-fenced execution-started fact (ADR 0019). `None` until a
828    /// runner records it immediately before executing. Boxed so these
829    /// usually-absent facts do not enlarge the pervasive `ProcessRecord` that
830    /// flows through the runtime; serde treats `Option<Box<T>>` identically to
831    /// `Option<T>`, so the persisted JSON is unchanged.
832    #[serde(default, skip_serializing_if = "Option::is_none")]
833    pub first_started: Option<Box<ProcessStarted>>,
834    /// Pending Abandon Request the sweep reconciles once the lease lapses.
835    #[serde(default, skip_serializing_if = "Option::is_none")]
836    pub abandon_request: Option<Box<AbandonRequest>>,
837    #[serde(default, skip_serializing_if = "Option::is_none")]
838    pub wait: Option<WaitState>,
839    #[serde(default)]
840    pub status: ProcessStatus,
841}
842
843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844pub struct WaitState {
845    pub kind: WaitKind,
846    pub since_ms: u64,
847}
848
849#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
850#[serde(tag = "kind", rename_all = "snake_case")]
851pub enum WaitKind {
852    Signal {
853        name: String,
854        event_type: String,
855        key: String,
856        ordinal: u64,
857    },
858}
859
860impl ProcessRecord {
861    pub fn from_registration(registration: ProcessRegistration) -> Self {
862        Self::from_registration_with_clock(registration, &crate::SystemClock)
863    }
864
865    pub fn from_registration_with_clock(
866        mut registration: ProcessRegistration,
867        clock: &dyn crate::Clock,
868    ) -> Self {
869        ensure_core_event_types(&mut registration);
870        validate_process_registration(&registration)
871            .expect("process registration should be valid before record construction");
872        let registration_hash = process_registration_hash(&registration)
873            .expect("process registration should hash before record construction");
874        Self::from_prepared_registration(registration, registration_hash, clock.timestamp_ms())
875    }
876
877    pub fn from_prepared_registration(
878        registration: ProcessRegistration,
879        registration_hash: String,
880        now_ms: u64,
881    ) -> Self {
882        Self {
883            id: registration.id,
884            registration_hash,
885            input: registration.input,
886            disposition: registration.disposition,
887            identity: registration.identity,
888            event_types: registration.event_types,
889            provenance: registration.provenance,
890            env_ref: registration.env_ref,
891            wake_target: registration.wake_target,
892            created_at_ms: now_ms,
893            updated_at_ms: now_ms,
894            external_ref: None,
895            first_started: None,
896            abandon_request: None,
897            wait: None,
898            status: ProcessStatus::Running,
899        }
900    }
901
902    pub fn is_terminal(&self) -> bool {
903        self.status.is_terminal()
904    }
905
906    pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
907        self.clear_wake_target_for_session_with_clock(session_id, &crate::SystemClock)
908    }
909
910    pub fn clear_wake_target_for_session_with_clock(
911        &mut self,
912        session_id: &str,
913        clock: &dyn crate::Clock,
914    ) -> bool {
915        let should_clear = self
916            .wake_target
917            .as_ref()
918            .is_some_and(|scope| scope.session_id == session_id);
919        if should_clear {
920            self.wake_target = None;
921            self.updated_at_ms = clock.timestamp_ms();
922        }
923        should_clear
924    }
925
926    pub fn originator_scope_id(&self) -> String {
927        self.provenance.originator.scope_id()
928    }
929}
930
931/// Canonical process identity stored alongside every durable process row.
932///
933/// `ProcessInput::Engine` keeps its payload opaque to core. Engines therefore
934/// publish their visible kind, display label, and definition identity at the
935/// registration boundary; list, summary, trigger, and observation paths read
936/// this durable field instead of decoding engine payload conventions.
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub struct ProcessIdentity {
939    pub kind: String,
940    #[serde(default, skip_serializing_if = "Option::is_none")]
941    pub label: Option<String>,
942    #[serde(default, skip_serializing_if = "Option::is_none")]
943    pub definition: Option<serde_json::Value>,
944}
945
946impl ProcessIdentity {
947    pub fn new(kind: impl Into<String>) -> Self {
948        Self {
949            kind: kind.into(),
950            label: None,
951            definition: None,
952        }
953    }
954
955    pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
956        self.label = label.map(Into::into);
957        self
958    }
959
960    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
961        self.definition = definition;
962        self
963    }
964
965    pub fn from_process_input(input: &ProcessInput) -> Self {
966        match input {
967            ProcessInput::ToolCall { call } => {
968                Self::new("tool").with_label(Some(call.tool_name.clone()))
969            }
970            ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
971            ProcessInput::SessionTurn { create_request, .. } => {
972                let label = create_request
973                    .subagent
974                    .as_ref()
975                    .map(|subagent| subagent.capability.clone())
976                    .or_else(|| create_request.usage_source.clone())
977                    .or_else(|| create_request.session_id.clone());
978                Self::new("session_turn").with_label(label)
979            }
980            ProcessInput::External { metadata } => {
981                let label = metadata
982                    .get("label")
983                    .or_else(|| metadata.get("name"))
984                    .or_else(|| metadata.get("title"))
985                    .and_then(serde_json::Value::as_str)
986                    .map(str::to_string);
987                Self::new("external").with_label(label)
988            }
989        }
990    }
991}
992
993/// Wire-format version stamped on every persisted [`ProcessLease`].
994///
995/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
996/// code cannot safely deserialize. Version 2 replaced the bare `owner_id`
997/// string with a full [`LeaseOwnerIdentity`](crate::LeaseOwnerIdentity)
998/// carrying incarnation and liveness metadata for fenced reclaim.
999pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 2;
1000
1001/// Durable lease over a non-terminal background process.
1002///
1003/// The lease pair `(owner, lease_token)` plus `fencing_token` are how lash guarantees that
1004/// one non-terminal process is re-executed by exactly one worker at a time —
1005/// even after a crash, even across two workers that both sweep the same
1006/// registry for recoverable work. The durable backend
1007/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
1008/// `process_id`; future distributed durable backends use the *same* fields to
1009/// coordinate workers that don't share a file system.
1010///
1011/// The owner is a full [`LeaseOwnerIdentity`](crate::LeaseOwnerIdentity):
1012/// its persisted liveness metadata is what lets a sweeping worker prove a
1013/// busy holder is *definitely dead* and reclaim the lease before the TTL
1014/// through [`ProcessRegistry::reclaim_process_lease`](super::ProcessRegistry::reclaim_process_lease),
1015/// mirroring the session execution lane.
1016///
1017/// **This is not single-process theatre.** The owner / fencing-token /
1018/// lease-token triple is the public contract that lets any backend detect and
1019/// reject stale writers. Treat it as load-bearing, not defensive.
1020#[derive(Clone, Debug, Serialize, Deserialize)]
1021pub struct ProcessLease {
1022    pub schema_version: u32,
1023    pub process_id: ProcessId,
1024    pub owner: crate::LeaseOwnerIdentity,
1025    pub lease_token: String,
1026    pub fencing_token: u64,
1027    pub claimed_at_epoch_ms: u64,
1028    pub expires_at_epoch_ms: u64,
1029}
1030
1031/// Outcome of claiming (or reclaiming) a [`ProcessLease`].
1032///
1033/// Mirrors [`SessionExecutionLeaseClaimOutcome`](crate::SessionExecutionLeaseClaimOutcome):
1034/// a busy outcome carries the observed holder so the claimant can assess its
1035/// liveness and perform a fenced reclaim on exactly the lease it observed.
1036#[derive(Clone, Debug, Serialize, Deserialize)]
1037pub enum ProcessLeaseClaimOutcome {
1038    Acquired(ProcessLease),
1039    Busy { holder: ProcessLease },
1040}
1041
1042impl ProcessLeaseClaimOutcome {
1043    pub fn acquired(self) -> Option<ProcessLease> {
1044        match self {
1045            Self::Acquired(lease) => Some(lease),
1046            Self::Busy { .. } => None,
1047        }
1048    }
1049}
1050
1051#[derive(Clone, Debug, Serialize, Deserialize)]
1052pub struct ProcessLeaseCompletion {
1053    pub process_id: ProcessId,
1054    pub lease_token: String,
1055}
1056
1057impl ProcessLeaseCompletion {
1058    pub fn from_lease(lease: &ProcessLease) -> Self {
1059        Self {
1060            process_id: lease.process_id.clone(),
1061            lease_token: lease.lease_token.clone(),
1062        }
1063    }
1064}
1065
1066/// Durable backend reference for background work accepted outside the local process.
1067#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1068pub struct ProcessExternalRef {
1069    pub backend: String,
1070    pub id: String,
1071    #[serde(default, skip_serializing_if = "Option::is_none")]
1072    pub metadata: Option<serde_json::Value>,
1073}
1074
1075#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1076pub struct ProcessHandleDescriptor {
1077    #[serde(default, skip_serializing_if = "Option::is_none")]
1078    pub kind: Option<String>,
1079    #[serde(default, skip_serializing_if = "Option::is_none")]
1080    pub label: Option<String>,
1081}
1082
1083impl ProcessHandleDescriptor {
1084    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
1085        Self {
1086            kind: kind.map(Into::into),
1087            label: label.map(Into::into),
1088        }
1089    }
1090}
1091
1092#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1093pub struct ProcessHandleGrant {
1094    pub session_id: String,
1095    pub process_id: ProcessId,
1096    pub descriptor: ProcessHandleDescriptor,
1097}
1098
1099pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
1100
1101#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1102#[serde(rename_all = "snake_case")]
1103pub enum ProcessLifecycleStatus {
1104    #[default]
1105    Running,
1106    Completed,
1107    Failed,
1108    Cancelled,
1109    Abandoned,
1110}
1111
1112impl ProcessLifecycleStatus {
1113    pub fn label(self) -> &'static str {
1114        match self {
1115            Self::Running => "running",
1116            Self::Completed => "completed",
1117            Self::Failed => "failed",
1118            Self::Cancelled => "cancelled",
1119            Self::Abandoned => "abandoned",
1120        }
1121    }
1122
1123    pub fn is_terminal(self) -> bool {
1124        !matches!(self, Self::Running)
1125    }
1126
1127    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
1128        match self {
1129            Self::Running => None,
1130            Self::Completed => Some(ProcessTerminalState::Completed),
1131            Self::Failed => Some(ProcessTerminalState::Failed),
1132            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
1133            Self::Abandoned => Some(ProcessTerminalState::Abandoned),
1134        }
1135    }
1136}
1137
1138impl From<&ProcessStatus> for ProcessLifecycleStatus {
1139    fn from(status: &ProcessStatus) -> Self {
1140        match status {
1141            ProcessStatus::Running => Self::Running,
1142            ProcessStatus::Completed { .. } => Self::Completed,
1143            ProcessStatus::Failed { .. } => Self::Failed,
1144            ProcessStatus::Cancelled { .. } => Self::Cancelled,
1145            ProcessStatus::Abandoned { .. } => Self::Abandoned,
1146        }
1147    }
1148}
1149
1150impl From<ProcessStatus> for ProcessLifecycleStatus {
1151    fn from(status: ProcessStatus) -> Self {
1152        Self::from(&status)
1153    }
1154}
1155
1156#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1157pub struct ProcessHandleSummary {
1158    #[serde(rename = "__handle__")]
1159    pub handle_type: String,
1160    pub id: ProcessId,
1161    pub process_id: ProcessId,
1162    pub descriptor: ProcessHandleDescriptor,
1163    #[serde(default, skip_serializing_if = "Option::is_none")]
1164    pub definition: Option<serde_json::Value>,
1165    pub status: ProcessLifecycleStatus,
1166}
1167
1168impl ProcessHandleSummary {
1169    pub fn new(
1170        process_id: impl Into<ProcessId>,
1171        descriptor: ProcessHandleDescriptor,
1172        status: ProcessLifecycleStatus,
1173    ) -> Self {
1174        let process_id = process_id.into();
1175        Self {
1176            handle_type: "process".to_string(),
1177            id: process_id.clone(),
1178            process_id,
1179            descriptor,
1180            definition: None,
1181            status,
1182        }
1183    }
1184
1185    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1186        self.definition = definition;
1187        self
1188    }
1189
1190    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1191        let definition = record.identity.definition.clone();
1192        Self::new(
1193            record.id,
1194            grant.descriptor,
1195            ProcessLifecycleStatus::from(record.status),
1196        )
1197        .with_definition(definition)
1198    }
1199}
1200
1201impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1202    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1203        Self::from_grant_record(grant, record)
1204    }
1205}
1206
1207#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1208pub struct ProcessCancelSummary {
1209    pub process_id: ProcessId,
1210    pub status: ProcessLifecycleStatus,
1211}
1212
1213impl ProcessCancelSummary {
1214    pub fn from_record(record: ProcessRecord) -> Self {
1215        Self {
1216            process_id: record.id,
1217            status: ProcessLifecycleStatus::from(record.status),
1218        }
1219    }
1220}
1221
1222#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1223pub enum ProcessStatusFilter {
1224    #[default]
1225    Running,
1226    Completed,
1227    Failed,
1228    Cancelled,
1229    Abandoned,
1230    Any,
1231}
1232
1233impl ProcessStatusFilter {
1234    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1235        match value.unwrap_or("running") {
1236            "running" => Ok(Self::Running),
1237            "completed" => Ok(Self::Completed),
1238            "failed" => Ok(Self::Failed),
1239            "cancelled" => Ok(Self::Cancelled),
1240            "abandoned" => Ok(Self::Abandoned),
1241            "any" => Ok(Self::Any),
1242            other => Err(format!(
1243                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, `abandoned`, or `any`, got `{other}`"
1244            )),
1245        }
1246    }
1247
1248    pub fn list_mode(self) -> ProcessListMode {
1249        match self {
1250            Self::Running => ProcessListMode::Live,
1251            Self::Completed | Self::Failed | Self::Cancelled | Self::Abandoned | Self::Any => {
1252                ProcessListMode::All
1253            }
1254        }
1255    }
1256
1257    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1258        match self {
1259            Self::Running => status == ProcessLifecycleStatus::Running,
1260            Self::Completed => status == ProcessLifecycleStatus::Completed,
1261            Self::Failed => status == ProcessLifecycleStatus::Failed,
1262            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1263            Self::Abandoned => status == ProcessLifecycleStatus::Abandoned,
1264            Self::Any => true,
1265        }
1266    }
1267}
1268
1269#[derive(Clone, Debug, Default, PartialEq)]
1270pub struct ProcessListFilter {
1271    pub definition: Option<serde_json::Value>,
1272    pub status: ProcessStatusFilter,
1273    pub waiting: Option<bool>,
1274}
1275
1276impl ProcessListFilter {
1277    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1278        let map = args
1279            .as_object()
1280            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1281        for key in map.keys() {
1282            match key.as_str() {
1283                "definition" | "status" | "waiting" => {}
1284                _ => return Err(format!("processes.list unknown filter `{key}`")),
1285            }
1286        }
1287        let definition = args.get("definition").cloned();
1288        let status =
1289            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1290        let waiting = args
1291            .get("waiting")
1292            .map(|value| {
1293                value
1294                    .as_bool()
1295                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1296            })
1297            .transpose()?;
1298        Ok(Self {
1299            definition,
1300            status,
1301            waiting,
1302        })
1303    }
1304
1305    pub fn list_mode(&self) -> ProcessListMode {
1306        self.status.list_mode()
1307    }
1308
1309    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1310        let (_grant, record) = entry;
1311        self.matches_record(record)
1312    }
1313
1314    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1315        let status = ProcessLifecycleStatus::from(&record.status);
1316        self.status.matches(status)
1317            && self
1318                .definition
1319                .as_ref()
1320                .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1321            && self
1322                .waiting
1323                .is_none_or(|waiting| record.wait.is_some() == waiting)
1324    }
1325}
1326
1327#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1328#[serde(rename_all = "snake_case")]
1329pub enum ProcessListMode {
1330    #[default]
1331    Live,
1332    All,
1333}
1334
1335impl ProcessListMode {
1336    pub fn as_str(self) -> &'static str {
1337        match self {
1338            Self::Live => "live",
1339            Self::All => "all",
1340        }
1341    }
1342}
1343
1344#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1345pub struct ProcessStartGrant {
1346    pub session_scope: SessionScope,
1347    pub descriptor: ProcessHandleDescriptor,
1348}
1349
1350#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1351pub struct ProcessSessionDeleteReport {
1352    pub session_id: String,
1353    pub revoked_handle_count: usize,
1354    pub deleted_wake_count: usize,
1355    pub orphaned_process_ids: Vec<String>,
1356    pub preserved_process_ids: Vec<String>,
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361    use serde_json::json;
1362
1363    use super::*;
1364
1365    fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1366        json!({
1367            "component": component,
1368            "pos": pos,
1369            "name": name,
1370        })
1371    }
1372
1373    fn engine_entry(
1374        process_id: &str,
1375        definition: serde_json::Value,
1376        process_name: &str,
1377        status: ProcessStatus,
1378    ) -> ProcessHandleGrantEntry {
1379        let mut record = ProcessRecord::from_registration(
1380            ProcessRegistration::new(
1381                process_id,
1382                ProcessInput::Engine {
1383                    kind: "test-engine".to_string(),
1384                    payload: json!({
1385                        "definition": definition.clone(),
1386                        "label": process_name,
1387                    }),
1388                },
1389                RecoveryDisposition::Rerunnable,
1390                ProcessProvenance::host(),
1391            )
1392            .with_identity(
1393                ProcessIdentity::new("test-engine")
1394                    .with_label(Some(process_name))
1395                    .with_definition(Some(definition)),
1396            )
1397            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1398                "process-env:test:{process_id}"
1399            )))),
1400        );
1401        record.status = status;
1402        (
1403            ProcessHandleGrant {
1404                session_id: "session".to_string(),
1405                process_id: process_id.to_string(),
1406                descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1407            },
1408            record,
1409        )
1410    }
1411
1412    #[test]
1413    fn process_list_filter_matches_definition_and_status() {
1414        let target_ref = process_value("target", 0, "target");
1415        let other_ref = process_value("other", 1, "other");
1416        let filter = ProcessListFilter::decode(&json!({
1417            "definition": target_ref,
1418            "status": "completed"
1419        }))
1420        .expect("decode filter");
1421
1422        let matching = engine_entry(
1423            "matching",
1424            target_ref,
1425            "target",
1426            ProcessStatus::Completed {
1427                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1428                    json!(true),
1429                )),
1430            },
1431        );
1432        let wrong_definition = engine_entry(
1433            "wrong-definition",
1434            other_ref,
1435            "other",
1436            ProcessStatus::Completed {
1437                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1438                    json!(true),
1439                )),
1440            },
1441        );
1442
1443        assert_eq!(filter.list_mode(), ProcessListMode::All);
1444        assert!(filter.matches_entry(&matching));
1445        assert!(!filter.matches_entry(&wrong_definition));
1446    }
1447
1448    #[test]
1449    fn process_list_filter_matches_waiting_facet() {
1450        let process_ref = process_value("target", 0, "target");
1451        let mut waiting_entry = engine_entry(
1452            "waiting",
1453            process_ref.clone(),
1454            "target",
1455            ProcessStatus::Running,
1456        );
1457        waiting_entry.1.wait = Some(WaitState {
1458            since_ms: 42,
1459            kind: WaitKind::Signal {
1460                name: "ready".to_string(),
1461                event_type: "signal.ready".to_string(),
1462                key: "process:waiting:signal.ready:1".to_string(),
1463                ordinal: 1,
1464            },
1465        });
1466        let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1467        let waiting_filter =
1468            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1469        let idle_filter =
1470            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1471
1472        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1473        assert!(waiting_filter.matches_entry(&waiting_entry));
1474        assert!(!waiting_filter.matches_entry(&idle_entry));
1475        assert!(!idle_filter.matches_entry(&waiting_entry));
1476        assert!(idle_filter.matches_entry(&idle_entry));
1477        assert!(
1478            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1479                .expect_err("invalid waiting filter")
1480                .contains("must be a boolean")
1481        );
1482    }
1483}