Skip to main content

lash_core/runtime/process/
service.rs

1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5    ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessListMode,
6    ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions, ProcessStartRequest,
7};
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub enum ProcessCancelSource {
11    Tool,
12    Process,
13    HostApi,
14}
15
16#[derive(Clone)]
17pub struct ProcessCancelRequest<'scope> {
18    pub session_id: &'scope str,
19    pub process_id: &'scope str,
20    pub handle: Option<serde_json::Value>,
21    pub scope: ProcessOpScope<'scope>,
22    pub reason: Option<String>,
23    pub source: ProcessCancelSource,
24}
25
26impl<'scope> ProcessCancelRequest<'scope> {
27    pub fn new(
28        session_id: &'scope str,
29        process_id: &'scope str,
30        scope: ProcessOpScope<'scope>,
31        source: ProcessCancelSource,
32    ) -> Self {
33        Self {
34            session_id,
35            process_id,
36            handle: None,
37            scope,
38            reason: None,
39            source,
40        }
41    }
42
43    pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
44        self.handle = Some(handle);
45        self
46    }
47
48    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
49        self.reason = Some(reason.into());
50        self
51    }
52}
53
54#[derive(Clone)]
55pub struct ProcessCancelAllRequest<'scope> {
56    pub session_id: &'scope str,
57    pub scope: ProcessOpScope<'scope>,
58    pub source: ProcessCancelSource,
59    pub reason: Option<String>,
60}
61
62impl<'scope> ProcessCancelAllRequest<'scope> {
63    pub fn new(
64        session_id: &'scope str,
65        scope: ProcessOpScope<'scope>,
66        source: ProcessCancelSource,
67    ) -> Self {
68        Self {
69            session_id,
70            scope,
71            source,
72            reason: None,
73        }
74    }
75
76    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77        self.reason = Some(reason.into());
78        self
79    }
80}
81
82#[async_trait::async_trait]
83pub trait ProcessCancelAbility: Send + Sync {
84    async fn cancel(
85        &self,
86        processes: &dyn ProcessService,
87        request: ProcessCancelRequest<'_>,
88    ) -> Result<ProcessRecord, PluginError>;
89
90    async fn cancel_summary(
91        &self,
92        processes: &dyn ProcessService,
93        request: ProcessCancelRequest<'_>,
94    ) -> Result<ProcessCancelSummary, PluginError> {
95        self.cancel(processes, request)
96            .await
97            .map(ProcessCancelSummary::from_record)
98    }
99
100    async fn cancel_all_visible(
101        &self,
102        processes: &dyn ProcessService,
103        request: ProcessCancelAllRequest<'_>,
104    ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
105        let entries = processes
106            .list_visible(
107                request.session_id,
108                ProcessListMode::Live,
109                request.scope.clone(),
110            )
111            .await?;
112        let mut cancelled = Vec::new();
113        for (grant, record) in entries {
114            if record.is_terminal() {
115                continue;
116            }
117            let mut cancel_request = ProcessCancelRequest::new(
118                request.session_id,
119                &grant.process_id,
120                request.scope.clone(),
121                request.source,
122            );
123            if let Some(reason) = request.reason.clone() {
124                cancel_request = cancel_request.with_reason(reason);
125            }
126            cancelled.push(self.cancel_summary(processes, cancel_request).await?);
127        }
128        Ok(cancelled)
129    }
130}
131
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultProcessCancelAbility;
134
135#[async_trait::async_trait]
136impl ProcessCancelAbility for DefaultProcessCancelAbility {
137    async fn cancel(
138        &self,
139        processes: &dyn ProcessService,
140        request: ProcessCancelRequest<'_>,
141    ) -> Result<ProcessRecord, PluginError> {
142        let process_ids = [request.process_id.to_string()];
143        processes
144            .validate_visible(request.session_id, &process_ids, request.scope.clone())
145            .await?;
146        processes
147            .cancel(request.session_id, request.process_id, request.scope)
148            .await
149    }
150}
151
152#[async_trait::async_trait]
153pub trait ProcessService: Send + Sync {
154    async fn start_from_request(
155        &self,
156        session_id: &str,
157        request: ProcessStartRequest,
158        scope: ProcessOpScope<'_>,
159    ) -> Result<ProcessHandleSummary, PluginError> {
160        let _ = (session_id, request, scope);
161        Err(PluginError::Session(
162            "process start request composition is unavailable in this service".to_string(),
163        ))
164    }
165
166    async fn start(
167        &self,
168        session_id: &str,
169        registration: ProcessRegistration,
170        options: ProcessStartOptions,
171        scope: ProcessOpScope<'_>,
172    ) -> Result<ProcessRecord, PluginError>;
173
174    /// Write the terminal outcome for an Externally-Owned process the session
175    /// holds a grant for (ADR 0019). Closure for work lash never executes — a
176    /// detached command records its immediately-terminal launch fact here. Only
177    /// Externally-Owned rows may be completed this way.
178    async fn complete_external(
179        &self,
180        session_id: &str,
181        process_id: &str,
182        await_output: ProcessAwaitOutput,
183        scope: ProcessOpScope<'_>,
184    ) -> Result<ProcessRecord, PluginError> {
185        let _ = (session_id, process_id, await_output, scope);
186        Err(PluginError::Session(
187            "external process completion is unavailable in this service".to_string(),
188        ))
189    }
190
191    async fn await_process(
192        &self,
193        process_id: &str,
194        scope: ProcessOpScope<'_>,
195    ) -> Result<ProcessAwaitOutput, PluginError>;
196
197    async fn list_visible(
198        &self,
199        session_id: &str,
200        mode: ProcessListMode,
201        scope: ProcessOpScope<'_>,
202    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
203
204    async fn validate_visible(
205        &self,
206        session_id: &str,
207        process_ids: &[String],
208        scope: ProcessOpScope<'_>,
209    ) -> Result<(), PluginError>;
210
211    async fn cancel(
212        &self,
213        session_id: &str,
214        process_id: &str,
215        scope: ProcessOpScope<'_>,
216    ) -> Result<ProcessRecord, PluginError>;
217
218    async fn signal(
219        &self,
220        session_id: &str,
221        process_id: &str,
222        signal_name: String,
223        signal_id: String,
224        payload: serde_json::Value,
225        scope: ProcessOpScope<'_>,
226    ) -> Result<ProcessEvent, PluginError>;
227
228    async fn transfer(
229        &self,
230        from_session_id: &str,
231        to_session_id: &str,
232        process_ids: Vec<String>,
233        scope: ProcessOpScope<'_>,
234    ) -> Result<(), PluginError>;
235
236    async fn cancel_unreferenced(
237        &self,
238        session_id: &str,
239        keep_process_ids: Vec<String>,
240        scope: ProcessOpScope<'_>,
241    ) -> Result<Vec<ProcessRecord>, PluginError>;
242}
243
244pub struct UnavailableProcessService;
245
246#[async_trait::async_trait]
247impl ProcessService for UnavailableProcessService {
248    async fn start(
249        &self,
250        _session_id: &str,
251        _registration: ProcessRegistration,
252        _options: ProcessStartOptions,
253        _scope: ProcessOpScope<'_>,
254    ) -> Result<ProcessRecord, PluginError> {
255        Err(PluginError::Session(
256            "processes are unavailable in this runtime".to_string(),
257        ))
258    }
259
260    async fn await_process(
261        &self,
262        _process_id: &str,
263        _scope: ProcessOpScope<'_>,
264    ) -> Result<ProcessAwaitOutput, PluginError> {
265        Err(PluginError::Session(
266            "process awaiting is unavailable in this runtime".to_string(),
267        ))
268    }
269
270    async fn list_visible(
271        &self,
272        _session_id: &str,
273        _mode: ProcessListMode,
274        _scope: ProcessOpScope<'_>,
275    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
276        Err(PluginError::Session(
277            "process registry is unavailable in this runtime".to_string(),
278        ))
279    }
280
281    async fn validate_visible(
282        &self,
283        _session_id: &str,
284        _process_ids: &[String],
285        _scope: ProcessOpScope<'_>,
286    ) -> Result<(), PluginError> {
287        Err(PluginError::Session(
288            "process handle validation is unavailable in this runtime".to_string(),
289        ))
290    }
291
292    async fn cancel(
293        &self,
294        _session_id: &str,
295        _process_id: &str,
296        _scope: ProcessOpScope<'_>,
297    ) -> Result<ProcessRecord, PluginError> {
298        Err(PluginError::Session(
299            "process registry is unavailable in this runtime".to_string(),
300        ))
301    }
302
303    async fn signal(
304        &self,
305        _session_id: &str,
306        _process_id: &str,
307        _signal_name: String,
308        _signal_id: String,
309        _payload: serde_json::Value,
310        _scope: ProcessOpScope<'_>,
311    ) -> Result<ProcessEvent, PluginError> {
312        Err(PluginError::Session(
313            "process signalling is unavailable in this runtime".to_string(),
314        ))
315    }
316
317    async fn transfer(
318        &self,
319        _from_session_id: &str,
320        _to_session_id: &str,
321        process_ids: Vec<String>,
322        _scope: ProcessOpScope<'_>,
323    ) -> Result<(), PluginError> {
324        if process_ids.is_empty() {
325            return Ok(());
326        }
327        Err(PluginError::Session(
328            "process handle transfer is unavailable in this runtime".to_string(),
329        ))
330    }
331
332    async fn cancel_unreferenced(
333        &self,
334        _session_id: &str,
335        _keep_process_ids: Vec<String>,
336        _scope: ProcessOpScope<'_>,
337    ) -> Result<Vec<ProcessRecord>, PluginError> {
338        Err(PluginError::Session(
339            "process handle cleanup is unavailable in this runtime".to_string(),
340        ))
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::collections::HashSet;
347    use std::sync::{Arc, Mutex};
348
349    use serde_json::json;
350
351    use super::*;
352    use crate::{
353        ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
354        ProcessInput, ProcessProvenance, ProcessRegistration, ProcessStatus,
355    };
356
357    struct RecordingProcessService {
358        visible: HashSet<String>,
359        validate_calls: Mutex<Vec<Vec<String>>>,
360        cancel_calls: Mutex<Vec<String>>,
361        visible_entries: Vec<ProcessHandleGrantEntry>,
362        record: ProcessRecord,
363    }
364
365    impl RecordingProcessService {
366        fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
367            Self {
368                visible: visible.into_iter().collect(),
369                validate_calls: Mutex::new(Vec::new()),
370                cancel_calls: Mutex::new(Vec::new()),
371                visible_entries: Vec::new(),
372                record,
373            }
374        }
375
376        fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
377            self.visible_entries = process_ids
378                .into_iter()
379                .map(|process_id| {
380                    (
381                        ProcessHandleGrant {
382                            session_id: "session-1".to_string(),
383                            process_id: process_id.clone(),
384                            descriptor: ProcessHandleDescriptor::new(
385                                Some("test"),
386                                Some(process_id.clone()),
387                            ),
388                        },
389                        ProcessRecord::from_registration(ProcessRegistration::new(
390                            process_id,
391                            ProcessInput::External {
392                                metadata: json!(null),
393                            },
394                            crate::RecoveryDisposition::ExternallyOwned,
395                            ProcessProvenance::host(),
396                        )),
397                    )
398                })
399                .collect();
400            self
401        }
402
403        fn validate_calls(&self) -> Vec<Vec<String>> {
404            self.validate_calls.lock().expect("validate calls").clone()
405        }
406
407        fn cancel_calls(&self) -> Vec<String> {
408            self.cancel_calls.lock().expect("cancel calls").clone()
409        }
410    }
411
412    #[derive(Default)]
413    struct RecordingCancelAbility {
414        requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
415    }
416
417    impl RecordingCancelAbility {
418        fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
419            self.requests.lock().expect("cancel requests").clone()
420        }
421    }
422
423    #[async_trait::async_trait]
424    impl ProcessCancelAbility for RecordingCancelAbility {
425        async fn cancel(
426            &self,
427            processes: &dyn ProcessService,
428            request: ProcessCancelRequest<'_>,
429        ) -> Result<ProcessRecord, PluginError> {
430            self.requests.lock().expect("cancel requests").push((
431                request.process_id.to_string(),
432                request.source,
433                request.reason.clone(),
434            ));
435            DefaultProcessCancelAbility.cancel(processes, request).await
436        }
437    }
438
439    #[async_trait::async_trait]
440    impl ProcessService for RecordingProcessService {
441        async fn start(
442            &self,
443            _session_id: &str,
444            _registration: ProcessRegistration,
445            _options: ProcessStartOptions,
446            _scope: ProcessOpScope<'_>,
447        ) -> Result<ProcessRecord, PluginError> {
448            Err(PluginError::Session("start not implemented".to_string()))
449        }
450
451        async fn await_process(
452            &self,
453            _process_id: &str,
454            _scope: ProcessOpScope<'_>,
455        ) -> Result<ProcessAwaitOutput, PluginError> {
456            Err(PluginError::Session("await not implemented".to_string()))
457        }
458
459        async fn list_visible(
460            &self,
461            _session_id: &str,
462            _mode: ProcessListMode,
463            _scope: ProcessOpScope<'_>,
464        ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
465            Ok(self.visible_entries.clone())
466        }
467
468        async fn validate_visible(
469            &self,
470            _session_id: &str,
471            process_ids: &[String],
472            _scope: ProcessOpScope<'_>,
473        ) -> Result<(), PluginError> {
474            self.validate_calls
475                .lock()
476                .expect("validate calls")
477                .push(process_ids.to_vec());
478            if let Some(missing) = process_ids
479                .iter()
480                .find(|process_id| !self.visible.contains(*process_id))
481            {
482                return Err(PluginError::Session(format!(
483                    "process handle `{missing}` is not visible"
484                )));
485            }
486            Ok(())
487        }
488
489        async fn cancel(
490            &self,
491            _session_id: &str,
492            process_id: &str,
493            _scope: ProcessOpScope<'_>,
494        ) -> Result<ProcessRecord, PluginError> {
495            self.cancel_calls
496                .lock()
497                .expect("cancel calls")
498                .push(process_id.to_string());
499            let mut record = self.record.clone();
500            record.id = process_id.to_string();
501            Ok(record)
502        }
503
504        async fn signal(
505            &self,
506            _session_id: &str,
507            _process_id: &str,
508            _signal_name: String,
509            _signal_id: String,
510            _payload: serde_json::Value,
511            _scope: ProcessOpScope<'_>,
512        ) -> Result<ProcessEvent, PluginError> {
513            Err(PluginError::Session("signal not implemented".to_string()))
514        }
515
516        async fn transfer(
517            &self,
518            _from_session_id: &str,
519            _to_session_id: &str,
520            _process_ids: Vec<String>,
521            _scope: ProcessOpScope<'_>,
522        ) -> Result<(), PluginError> {
523            Err(PluginError::Session("transfer not implemented".to_string()))
524        }
525
526        async fn cancel_unreferenced(
527            &self,
528            _session_id: &str,
529            _keep_process_ids: Vec<String>,
530            _scope: ProcessOpScope<'_>,
531        ) -> Result<Vec<ProcessRecord>, PluginError> {
532            Err(PluginError::Session(
533                "cancel unreferenced not implemented".to_string(),
534            ))
535        }
536    }
537
538    fn cancelled_record(process_id: &str) -> ProcessRecord {
539        let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
540            process_id,
541            ProcessInput::External {
542                metadata: json!(null),
543            },
544            crate::RecoveryDisposition::ExternallyOwned,
545            ProcessProvenance::host(),
546        ));
547        record.status = ProcessStatus::Cancelled {
548            await_output: ProcessAwaitOutput::Cancelled {
549                message: "cancelled".to_string(),
550                raw: None,
551                control: None,
552            },
553        };
554        record
555    }
556
557    fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
558        ProcessOpScope::new(
559            crate::ScopedEffectController::shared(
560                Arc::new(crate::InlineRuntimeEffectController),
561                crate::ExecutionScope::runtime_operation(id),
562            )
563            .expect("test execution scope"),
564        )
565    }
566
567    #[tokio::test]
568    async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
569        let service =
570            RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
571
572        let record = DefaultProcessCancelAbility
573            .cancel(
574                &service,
575                ProcessCancelRequest::new(
576                    "session-1",
577                    "process-1",
578                    test_process_scope("cancel-visible"),
579                    ProcessCancelSource::HostApi,
580                ),
581            )
582            .await
583            .expect("cancel process");
584
585        assert_eq!(record.status.label(), "cancelled");
586        assert_eq!(
587            service.validate_calls(),
588            vec![vec!["process-1".to_string()]]
589        );
590        assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
591    }
592
593    #[tokio::test]
594    async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
595        let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
596
597        let err = DefaultProcessCancelAbility
598            .cancel(
599                &service,
600                ProcessCancelRequest::new(
601                    "session-1",
602                    "p1",
603                    test_process_scope("cancel-hidden"),
604                    ProcessCancelSource::Tool,
605                ),
606            )
607            .await
608            .expect_err("hidden process should be rejected");
609
610        assert!(err.to_string().contains("not visible"), "{err}");
611        assert!(service.cancel_calls().is_empty());
612    }
613
614    #[tokio::test]
615    async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
616        let service = RecordingProcessService::new(
617            ["process-1".to_string(), "process-2".to_string()],
618            cancelled_record("template"),
619        )
620        .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
621        let ability = RecordingCancelAbility::default();
622
623        let summaries = ability
624            .cancel_all_visible(
625                &service,
626                ProcessCancelAllRequest::new(
627                    "session-1",
628                    test_process_scope("cancel-all"),
629                    ProcessCancelSource::Tool,
630                )
631                .with_reason("requested by tool"),
632            )
633            .await
634            .expect("cancel all visible");
635
636        assert_eq!(
637            summaries
638                .iter()
639                .map(|summary| summary.process_id.as_str())
640                .collect::<Vec<_>>(),
641            vec!["process-1", "process-2"]
642        );
643        assert_eq!(
644            ability.requests(),
645            vec![
646                (
647                    "process-1".to_string(),
648                    ProcessCancelSource::Tool,
649                    Some("requested by tool".to_string())
650                ),
651                (
652                    "process-2".to_string(),
653                    ProcessCancelSource::Tool,
654                    Some("requested by tool".to_string())
655                )
656            ]
657        );
658        assert_eq!(
659            service.validate_calls(),
660            vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
661        );
662        assert_eq!(
663            service.cancel_calls(),
664            vec!["process-1".to_string(), "process-2".to_string()]
665        );
666    }
667}