Skip to main content

lash_core/runtime/process/
service.rs

1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5    ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessLifecycleStatus,
6    ProcessListMode, ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions,
7    ProcessStartRequest,
8};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum ProcessCancelSource {
12    Tool,
13    Lashlang,
14    HostApi,
15}
16
17#[derive(Clone)]
18pub struct ProcessCancelRequest<'scope> {
19    pub session_id: &'scope str,
20    pub process_id: &'scope str,
21    pub handle: Option<serde_json::Value>,
22    pub scope: ProcessOpScope<'scope>,
23    pub reason: Option<String>,
24    pub source: ProcessCancelSource,
25}
26
27impl<'scope> ProcessCancelRequest<'scope> {
28    pub fn new(
29        session_id: &'scope str,
30        process_id: &'scope str,
31        scope: ProcessOpScope<'scope>,
32        source: ProcessCancelSource,
33    ) -> Self {
34        Self {
35            session_id,
36            process_id,
37            handle: None,
38            scope,
39            reason: None,
40            source,
41        }
42    }
43
44    pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
45        self.handle = Some(handle);
46        self
47    }
48
49    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
50        self.reason = Some(reason.into());
51        self
52    }
53}
54
55#[derive(Clone)]
56pub struct ProcessCancelAllRequest<'scope> {
57    pub session_id: &'scope str,
58    pub scope: ProcessOpScope<'scope>,
59    pub source: ProcessCancelSource,
60    pub reason: Option<String>,
61}
62
63impl<'scope> ProcessCancelAllRequest<'scope> {
64    pub fn new(
65        session_id: &'scope str,
66        scope: ProcessOpScope<'scope>,
67        source: ProcessCancelSource,
68    ) -> Self {
69        Self {
70            session_id,
71            scope,
72            source,
73            reason: None,
74        }
75    }
76
77    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
78        self.reason = Some(reason.into());
79        self
80    }
81}
82
83#[async_trait::async_trait]
84pub trait ProcessCancelAbility: Send + Sync {
85    async fn cancel(
86        &self,
87        processes: &dyn ProcessService,
88        request: ProcessCancelRequest<'_>,
89    ) -> Result<ProcessRecord, PluginError>;
90
91    async fn cancel_summary(
92        &self,
93        processes: &dyn ProcessService,
94        request: ProcessCancelRequest<'_>,
95    ) -> Result<ProcessCancelSummary, PluginError> {
96        self.cancel(processes, request)
97            .await
98            .map(ProcessCancelSummary::from_record)
99    }
100
101    async fn cancel_all_visible(
102        &self,
103        processes: &dyn ProcessService,
104        request: ProcessCancelAllRequest<'_>,
105    ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
106        let entries = processes
107            .list_visible(
108                request.session_id,
109                ProcessListMode::Live,
110                request.scope.clone(),
111            )
112            .await?;
113        let mut cancelled = Vec::new();
114        for (grant, record) in entries {
115            if record.is_terminal() {
116                continue;
117            }
118            let mut cancel_request = ProcessCancelRequest::new(
119                request.session_id,
120                &grant.process_id,
121                request.scope.clone(),
122                request.source,
123            );
124            if let Some(reason) = request.reason.clone() {
125                cancel_request = cancel_request.with_reason(reason);
126            }
127            cancelled.push(self.cancel_summary(processes, cancel_request).await?);
128        }
129        Ok(cancelled)
130    }
131}
132
133#[derive(Clone, Copy, Debug, Default)]
134pub struct DefaultProcessCancelAbility;
135
136#[async_trait::async_trait]
137impl ProcessCancelAbility for DefaultProcessCancelAbility {
138    async fn cancel(
139        &self,
140        processes: &dyn ProcessService,
141        request: ProcessCancelRequest<'_>,
142    ) -> Result<ProcessRecord, PluginError> {
143        let process_ids = [request.process_id.to_string()];
144        processes
145            .validate_visible(request.session_id, &process_ids, request.scope.clone())
146            .await?;
147        processes
148            .cancel(request.session_id, request.process_id, request.scope)
149            .await
150    }
151}
152
153#[async_trait::async_trait]
154pub trait ProcessService: Send + Sync {
155    async fn start_from_request(
156        &self,
157        session_id: &str,
158        request: ProcessStartRequest,
159        scope: ProcessOpScope<'_>,
160    ) -> Result<ProcessHandleSummary, PluginError> {
161        let (registration, options, descriptor) = request.into_registration_and_options();
162        let record = self.start(session_id, registration, options, scope).await?;
163        let definition = super::model::ProcessDefinitionSummary::from_input(record.input.as_ref());
164        Ok(ProcessHandleSummary::new(
165            record.id,
166            descriptor,
167            ProcessLifecycleStatus::from(record.status),
168        ))
169        .map(|summary| summary.with_definition(definition))
170    }
171
172    async fn start(
173        &self,
174        session_id: &str,
175        registration: ProcessRegistration,
176        options: ProcessStartOptions,
177        scope: ProcessOpScope<'_>,
178    ) -> Result<ProcessRecord, PluginError>;
179
180    async fn await_process(
181        &self,
182        process_id: &str,
183        scope: ProcessOpScope<'_>,
184    ) -> Result<ProcessAwaitOutput, PluginError>;
185
186    async fn list_visible(
187        &self,
188        session_id: &str,
189        mode: ProcessListMode,
190        scope: ProcessOpScope<'_>,
191    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
192
193    async fn validate_visible(
194        &self,
195        session_id: &str,
196        process_ids: &[String],
197        scope: ProcessOpScope<'_>,
198    ) -> Result<(), PluginError>;
199
200    async fn cancel(
201        &self,
202        session_id: &str,
203        process_id: &str,
204        scope: ProcessOpScope<'_>,
205    ) -> Result<ProcessRecord, PluginError>;
206
207    async fn signal(
208        &self,
209        session_id: &str,
210        process_id: &str,
211        signal_id: String,
212        payload: serde_json::Value,
213        scope: ProcessOpScope<'_>,
214    ) -> Result<ProcessEvent, PluginError>;
215
216    async fn transfer(
217        &self,
218        from_session_id: &str,
219        to_session_id: &str,
220        process_ids: Vec<String>,
221        scope: ProcessOpScope<'_>,
222    ) -> Result<(), PluginError>;
223
224    async fn cancel_unreferenced(
225        &self,
226        session_id: &str,
227        keep_process_ids: Vec<String>,
228        scope: ProcessOpScope<'_>,
229    ) -> Result<Vec<ProcessRecord>, PluginError>;
230}
231
232pub struct UnavailableProcessService;
233
234#[async_trait::async_trait]
235impl ProcessService for UnavailableProcessService {
236    async fn start(
237        &self,
238        _session_id: &str,
239        _registration: ProcessRegistration,
240        _options: ProcessStartOptions,
241        _scope: ProcessOpScope<'_>,
242    ) -> Result<ProcessRecord, PluginError> {
243        Err(PluginError::Session(
244            "processes are unavailable in this runtime".to_string(),
245        ))
246    }
247
248    async fn await_process(
249        &self,
250        _process_id: &str,
251        _scope: ProcessOpScope<'_>,
252    ) -> Result<ProcessAwaitOutput, PluginError> {
253        Err(PluginError::Session(
254            "process awaiting is unavailable in this runtime".to_string(),
255        ))
256    }
257
258    async fn list_visible(
259        &self,
260        _session_id: &str,
261        _mode: ProcessListMode,
262        _scope: ProcessOpScope<'_>,
263    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
264        Err(PluginError::Session(
265            "process registry is unavailable in this runtime".to_string(),
266        ))
267    }
268
269    async fn validate_visible(
270        &self,
271        _session_id: &str,
272        _process_ids: &[String],
273        _scope: ProcessOpScope<'_>,
274    ) -> Result<(), PluginError> {
275        Err(PluginError::Session(
276            "process handle validation is unavailable in this runtime".to_string(),
277        ))
278    }
279
280    async fn cancel(
281        &self,
282        _session_id: &str,
283        _process_id: &str,
284        _scope: ProcessOpScope<'_>,
285    ) -> Result<ProcessRecord, PluginError> {
286        Err(PluginError::Session(
287            "process registry is unavailable in this runtime".to_string(),
288        ))
289    }
290
291    async fn signal(
292        &self,
293        _session_id: &str,
294        _process_id: &str,
295        _signal_id: String,
296        _payload: serde_json::Value,
297        _scope: ProcessOpScope<'_>,
298    ) -> Result<ProcessEvent, PluginError> {
299        Err(PluginError::Session(
300            "process signalling is unavailable in this runtime".to_string(),
301        ))
302    }
303
304    async fn transfer(
305        &self,
306        _from_session_id: &str,
307        _to_session_id: &str,
308        process_ids: Vec<String>,
309        _scope: ProcessOpScope<'_>,
310    ) -> Result<(), PluginError> {
311        if process_ids.is_empty() {
312            return Ok(());
313        }
314        Err(PluginError::Session(
315            "process handle transfer is unavailable in this runtime".to_string(),
316        ))
317    }
318
319    async fn cancel_unreferenced(
320        &self,
321        _session_id: &str,
322        _keep_process_ids: Vec<String>,
323        _scope: ProcessOpScope<'_>,
324    ) -> Result<Vec<ProcessRecord>, PluginError> {
325        Err(PluginError::Session(
326            "process handle cleanup is unavailable in this runtime".to_string(),
327        ))
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use std::collections::HashSet;
334    use std::sync::{Arc, Mutex};
335
336    use serde_json::json;
337
338    use super::*;
339    use crate::{
340        ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
341        ProcessInput, ProcessRegistration, ProcessStatus,
342    };
343
344    struct RecordingProcessService {
345        visible: HashSet<String>,
346        validate_calls: Mutex<Vec<Vec<String>>>,
347        cancel_calls: Mutex<Vec<String>>,
348        visible_entries: Vec<ProcessHandleGrantEntry>,
349        record: ProcessRecord,
350    }
351
352    impl RecordingProcessService {
353        fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
354            Self {
355                visible: visible.into_iter().collect(),
356                validate_calls: Mutex::new(Vec::new()),
357                cancel_calls: Mutex::new(Vec::new()),
358                visible_entries: Vec::new(),
359                record,
360            }
361        }
362
363        fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
364            self.visible_entries = process_ids
365                .into_iter()
366                .map(|process_id| {
367                    (
368                        ProcessHandleGrant {
369                            session_id: "session-1".to_string(),
370                            process_id: process_id.clone(),
371                            descriptor: ProcessHandleDescriptor::new(
372                                Some("test"),
373                                Some(process_id.clone()),
374                            ),
375                        },
376                        ProcessRecord::from_registration(ProcessRegistration::new(
377                            process_id,
378                            ProcessInput::External {
379                                metadata: json!(null),
380                            },
381                        )),
382                    )
383                })
384                .collect();
385            self
386        }
387
388        fn validate_calls(&self) -> Vec<Vec<String>> {
389            self.validate_calls.lock().expect("validate calls").clone()
390        }
391
392        fn cancel_calls(&self) -> Vec<String> {
393            self.cancel_calls.lock().expect("cancel calls").clone()
394        }
395    }
396
397    #[derive(Default)]
398    struct RecordingCancelAbility {
399        requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
400    }
401
402    impl RecordingCancelAbility {
403        fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
404            self.requests.lock().expect("cancel requests").clone()
405        }
406    }
407
408    #[async_trait::async_trait]
409    impl ProcessCancelAbility for RecordingCancelAbility {
410        async fn cancel(
411            &self,
412            processes: &dyn ProcessService,
413            request: ProcessCancelRequest<'_>,
414        ) -> Result<ProcessRecord, PluginError> {
415            self.requests.lock().expect("cancel requests").push((
416                request.process_id.to_string(),
417                request.source,
418                request.reason.clone(),
419            ));
420            DefaultProcessCancelAbility.cancel(processes, request).await
421        }
422    }
423
424    #[async_trait::async_trait]
425    impl ProcessService for RecordingProcessService {
426        async fn start(
427            &self,
428            _session_id: &str,
429            _registration: ProcessRegistration,
430            _options: ProcessStartOptions,
431            _scope: ProcessOpScope<'_>,
432        ) -> Result<ProcessRecord, PluginError> {
433            Err(PluginError::Session("start not implemented".to_string()))
434        }
435
436        async fn await_process(
437            &self,
438            _process_id: &str,
439            _scope: ProcessOpScope<'_>,
440        ) -> Result<ProcessAwaitOutput, PluginError> {
441            Err(PluginError::Session("await not implemented".to_string()))
442        }
443
444        async fn list_visible(
445            &self,
446            _session_id: &str,
447            _mode: ProcessListMode,
448            _scope: ProcessOpScope<'_>,
449        ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
450            Ok(self.visible_entries.clone())
451        }
452
453        async fn validate_visible(
454            &self,
455            _session_id: &str,
456            process_ids: &[String],
457            _scope: ProcessOpScope<'_>,
458        ) -> Result<(), PluginError> {
459            self.validate_calls
460                .lock()
461                .expect("validate calls")
462                .push(process_ids.to_vec());
463            if let Some(missing) = process_ids
464                .iter()
465                .find(|process_id| !self.visible.contains(*process_id))
466            {
467                return Err(PluginError::Session(format!(
468                    "process handle `{missing}` is not visible"
469                )));
470            }
471            Ok(())
472        }
473
474        async fn cancel(
475            &self,
476            _session_id: &str,
477            process_id: &str,
478            _scope: ProcessOpScope<'_>,
479        ) -> Result<ProcessRecord, PluginError> {
480            self.cancel_calls
481                .lock()
482                .expect("cancel calls")
483                .push(process_id.to_string());
484            let mut record = self.record.clone();
485            record.id = process_id.to_string();
486            Ok(record)
487        }
488
489        async fn signal(
490            &self,
491            _session_id: &str,
492            _process_id: &str,
493            _signal_id: String,
494            _payload: serde_json::Value,
495            _scope: ProcessOpScope<'_>,
496        ) -> Result<ProcessEvent, PluginError> {
497            Err(PluginError::Session("signal not implemented".to_string()))
498        }
499
500        async fn transfer(
501            &self,
502            _from_session_id: &str,
503            _to_session_id: &str,
504            _process_ids: Vec<String>,
505            _scope: ProcessOpScope<'_>,
506        ) -> Result<(), PluginError> {
507            Err(PluginError::Session("transfer not implemented".to_string()))
508        }
509
510        async fn cancel_unreferenced(
511            &self,
512            _session_id: &str,
513            _keep_process_ids: Vec<String>,
514            _scope: ProcessOpScope<'_>,
515        ) -> Result<Vec<ProcessRecord>, PluginError> {
516            Err(PluginError::Session(
517                "cancel unreferenced not implemented".to_string(),
518            ))
519        }
520    }
521
522    fn cancelled_record(process_id: &str) -> ProcessRecord {
523        let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
524            process_id,
525            ProcessInput::External {
526                metadata: json!(null),
527            },
528        ));
529        record.status = ProcessStatus::Cancelled {
530            await_output: ProcessAwaitOutput::Cancelled {
531                message: "cancelled".to_string(),
532                raw: None,
533                control: None,
534            },
535        };
536        record
537    }
538
539    fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
540        ProcessOpScope::new(
541            crate::ScopedEffectController::shared(
542                Arc::new(crate::InlineRuntimeEffectController),
543                crate::EffectScope::runtime_operation(id),
544            )
545            .expect("test effect scope"),
546        )
547    }
548
549    #[tokio::test]
550    async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
551        let service =
552            RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
553
554        let record = DefaultProcessCancelAbility
555            .cancel(
556                &service,
557                ProcessCancelRequest::new(
558                    "session-1",
559                    "process-1",
560                    test_process_scope("cancel-visible"),
561                    ProcessCancelSource::HostApi,
562                ),
563            )
564            .await
565            .expect("cancel process");
566
567        assert_eq!(record.status.label(), "cancelled");
568        assert_eq!(
569            service.validate_calls(),
570            vec![vec!["process-1".to_string()]]
571        );
572        assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
573    }
574
575    #[tokio::test]
576    async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
577        let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
578
579        let err = DefaultProcessCancelAbility
580            .cancel(
581                &service,
582                ProcessCancelRequest::new(
583                    "session-1",
584                    "p1",
585                    test_process_scope("cancel-hidden"),
586                    ProcessCancelSource::Tool,
587                ),
588            )
589            .await
590            .expect_err("hidden process should be rejected");
591
592        assert!(err.to_string().contains("not visible"), "{err}");
593        assert!(service.cancel_calls().is_empty());
594    }
595
596    #[tokio::test]
597    async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
598        let service = RecordingProcessService::new(
599            ["process-1".to_string(), "process-2".to_string()],
600            cancelled_record("template"),
601        )
602        .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
603        let ability = RecordingCancelAbility::default();
604
605        let summaries = ability
606            .cancel_all_visible(
607                &service,
608                ProcessCancelAllRequest::new(
609                    "session-1",
610                    test_process_scope("cancel-all"),
611                    ProcessCancelSource::Tool,
612                )
613                .with_reason("requested by tool"),
614            )
615            .await
616            .expect("cancel all visible");
617
618        assert_eq!(
619            summaries
620                .iter()
621                .map(|summary| summary.process_id.as_str())
622                .collect::<Vec<_>>(),
623            vec!["process-1", "process-2"]
624        );
625        assert_eq!(
626            ability.requests(),
627            vec![
628                (
629                    "process-1".to_string(),
630                    ProcessCancelSource::Tool,
631                    Some("requested by tool".to_string())
632                ),
633                (
634                    "process-2".to_string(),
635                    ProcessCancelSource::Tool,
636                    Some("requested by tool".to_string())
637                )
638            ]
639        );
640        assert_eq!(
641            service.validate_calls(),
642            vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
643        );
644        assert_eq!(
645            service.cancel_calls(),
646            vec!["process-1".to_string(), "process-2".to_string()]
647        );
648    }
649}