1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5 ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessLifecycleStatus,
6 ProcessListMode, ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions,
7 ProcessStartRequest,
8};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum ProcessCancelSource {
12 Tool,
13 Lashlang,
14 HostApi,
15}
16
17#[derive(Clone)]
18pub struct ProcessCancelRequest<'scope> {
19 pub session_id: &'scope str,
20 pub process_id: &'scope str,
21 pub handle: Option<serde_json::Value>,
22 pub scope: ProcessOpScope<'scope>,
23 pub reason: Option<String>,
24 pub source: ProcessCancelSource,
25}
26
27impl<'scope> ProcessCancelRequest<'scope> {
28 pub fn new(
29 session_id: &'scope str,
30 process_id: &'scope str,
31 scope: ProcessOpScope<'scope>,
32 source: ProcessCancelSource,
33 ) -> Self {
34 Self {
35 session_id,
36 process_id,
37 handle: None,
38 scope,
39 reason: None,
40 source,
41 }
42 }
43
44 pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
45 self.handle = Some(handle);
46 self
47 }
48
49 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
50 self.reason = Some(reason.into());
51 self
52 }
53}
54
55#[derive(Clone)]
56pub struct ProcessCancelAllRequest<'scope> {
57 pub session_id: &'scope str,
58 pub scope: ProcessOpScope<'scope>,
59 pub source: ProcessCancelSource,
60 pub reason: Option<String>,
61}
62
63impl<'scope> ProcessCancelAllRequest<'scope> {
64 pub fn new(
65 session_id: &'scope str,
66 scope: ProcessOpScope<'scope>,
67 source: ProcessCancelSource,
68 ) -> Self {
69 Self {
70 session_id,
71 scope,
72 source,
73 reason: None,
74 }
75 }
76
77 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
78 self.reason = Some(reason.into());
79 self
80 }
81}
82
83#[async_trait::async_trait]
84pub trait ProcessCancelAbility: Send + Sync {
85 async fn cancel(
86 &self,
87 processes: &dyn ProcessService,
88 request: ProcessCancelRequest<'_>,
89 ) -> Result<ProcessRecord, PluginError>;
90
91 async fn cancel_summary(
92 &self,
93 processes: &dyn ProcessService,
94 request: ProcessCancelRequest<'_>,
95 ) -> Result<ProcessCancelSummary, PluginError> {
96 self.cancel(processes, request)
97 .await
98 .map(ProcessCancelSummary::from_record)
99 }
100
101 async fn cancel_all_visible(
102 &self,
103 processes: &dyn ProcessService,
104 request: ProcessCancelAllRequest<'_>,
105 ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
106 let entries = processes
107 .list_visible(
108 request.session_id,
109 ProcessListMode::Live,
110 request.scope.clone(),
111 )
112 .await?;
113 let mut cancelled = Vec::new();
114 for (grant, record) in entries {
115 if record.is_terminal() {
116 continue;
117 }
118 let mut cancel_request = ProcessCancelRequest::new(
119 request.session_id,
120 &grant.process_id,
121 request.scope.clone(),
122 request.source,
123 );
124 if let Some(reason) = request.reason.clone() {
125 cancel_request = cancel_request.with_reason(reason);
126 }
127 cancelled.push(self.cancel_summary(processes, cancel_request).await?);
128 }
129 Ok(cancelled)
130 }
131}
132
133#[derive(Clone, Copy, Debug, Default)]
134pub struct DefaultProcessCancelAbility;
135
136#[async_trait::async_trait]
137impl ProcessCancelAbility for DefaultProcessCancelAbility {
138 async fn cancel(
139 &self,
140 processes: &dyn ProcessService,
141 request: ProcessCancelRequest<'_>,
142 ) -> Result<ProcessRecord, PluginError> {
143 let process_ids = [request.process_id.to_string()];
144 processes
145 .validate_visible(request.session_id, &process_ids, request.scope.clone())
146 .await?;
147 processes
148 .cancel(request.session_id, request.process_id, request.scope)
149 .await
150 }
151}
152
153#[async_trait::async_trait]
154pub trait ProcessService: Send + Sync {
155 async fn start_from_request(
156 &self,
157 session_id: &str,
158 request: ProcessStartRequest,
159 scope: ProcessOpScope<'_>,
160 ) -> Result<ProcessHandleSummary, PluginError> {
161 let (registration, options, descriptor) = request.into_registration_and_options();
162 let record = self.start(session_id, registration, options, scope).await?;
163 let definition = super::model::ProcessDefinitionSummary::from_input(record.input.as_ref());
164 Ok(ProcessHandleSummary::new(
165 record.id,
166 descriptor,
167 ProcessLifecycleStatus::from(record.status),
168 ))
169 .map(|summary| summary.with_definition(definition))
170 }
171
172 async fn start(
173 &self,
174 session_id: &str,
175 registration: ProcessRegistration,
176 options: ProcessStartOptions,
177 scope: ProcessOpScope<'_>,
178 ) -> Result<ProcessRecord, PluginError>;
179
180 async fn await_process(
181 &self,
182 process_id: &str,
183 scope: ProcessOpScope<'_>,
184 ) -> Result<ProcessAwaitOutput, PluginError>;
185
186 async fn list_visible(
187 &self,
188 session_id: &str,
189 mode: ProcessListMode,
190 scope: ProcessOpScope<'_>,
191 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
192
193 async fn validate_visible(
194 &self,
195 session_id: &str,
196 process_ids: &[String],
197 scope: ProcessOpScope<'_>,
198 ) -> Result<(), PluginError>;
199
200 async fn cancel(
201 &self,
202 session_id: &str,
203 process_id: &str,
204 scope: ProcessOpScope<'_>,
205 ) -> Result<ProcessRecord, PluginError>;
206
207 async fn signal(
208 &self,
209 session_id: &str,
210 process_id: &str,
211 signal_id: String,
212 payload: serde_json::Value,
213 scope: ProcessOpScope<'_>,
214 ) -> Result<ProcessEvent, PluginError>;
215
216 async fn transfer(
217 &self,
218 from_session_id: &str,
219 to_session_id: &str,
220 process_ids: Vec<String>,
221 scope: ProcessOpScope<'_>,
222 ) -> Result<(), PluginError>;
223
224 async fn cancel_unreferenced(
225 &self,
226 session_id: &str,
227 keep_process_ids: Vec<String>,
228 scope: ProcessOpScope<'_>,
229 ) -> Result<Vec<ProcessRecord>, PluginError>;
230}
231
232pub struct UnavailableProcessService;
233
234#[async_trait::async_trait]
235impl ProcessService for UnavailableProcessService {
236 async fn start(
237 &self,
238 _session_id: &str,
239 _registration: ProcessRegistration,
240 _options: ProcessStartOptions,
241 _scope: ProcessOpScope<'_>,
242 ) -> Result<ProcessRecord, PluginError> {
243 Err(PluginError::Session(
244 "processes are unavailable in this runtime".to_string(),
245 ))
246 }
247
248 async fn await_process(
249 &self,
250 _process_id: &str,
251 _scope: ProcessOpScope<'_>,
252 ) -> Result<ProcessAwaitOutput, PluginError> {
253 Err(PluginError::Session(
254 "process awaiting is unavailable in this runtime".to_string(),
255 ))
256 }
257
258 async fn list_visible(
259 &self,
260 _session_id: &str,
261 _mode: ProcessListMode,
262 _scope: ProcessOpScope<'_>,
263 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
264 Err(PluginError::Session(
265 "process registry is unavailable in this runtime".to_string(),
266 ))
267 }
268
269 async fn validate_visible(
270 &self,
271 _session_id: &str,
272 _process_ids: &[String],
273 _scope: ProcessOpScope<'_>,
274 ) -> Result<(), PluginError> {
275 Err(PluginError::Session(
276 "process handle validation is unavailable in this runtime".to_string(),
277 ))
278 }
279
280 async fn cancel(
281 &self,
282 _session_id: &str,
283 _process_id: &str,
284 _scope: ProcessOpScope<'_>,
285 ) -> Result<ProcessRecord, PluginError> {
286 Err(PluginError::Session(
287 "process registry is unavailable in this runtime".to_string(),
288 ))
289 }
290
291 async fn signal(
292 &self,
293 _session_id: &str,
294 _process_id: &str,
295 _signal_id: String,
296 _payload: serde_json::Value,
297 _scope: ProcessOpScope<'_>,
298 ) -> Result<ProcessEvent, PluginError> {
299 Err(PluginError::Session(
300 "process signalling is unavailable in this runtime".to_string(),
301 ))
302 }
303
304 async fn transfer(
305 &self,
306 _from_session_id: &str,
307 _to_session_id: &str,
308 process_ids: Vec<String>,
309 _scope: ProcessOpScope<'_>,
310 ) -> Result<(), PluginError> {
311 if process_ids.is_empty() {
312 return Ok(());
313 }
314 Err(PluginError::Session(
315 "process handle transfer is unavailable in this runtime".to_string(),
316 ))
317 }
318
319 async fn cancel_unreferenced(
320 &self,
321 _session_id: &str,
322 _keep_process_ids: Vec<String>,
323 _scope: ProcessOpScope<'_>,
324 ) -> Result<Vec<ProcessRecord>, PluginError> {
325 Err(PluginError::Session(
326 "process handle cleanup is unavailable in this runtime".to_string(),
327 ))
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use std::collections::HashSet;
334 use std::sync::{Arc, Mutex};
335
336 use serde_json::json;
337
338 use super::*;
339 use crate::{
340 ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
341 ProcessInput, ProcessRegistration, ProcessStatus,
342 };
343
344 struct RecordingProcessService {
345 visible: HashSet<String>,
346 validate_calls: Mutex<Vec<Vec<String>>>,
347 cancel_calls: Mutex<Vec<String>>,
348 visible_entries: Vec<ProcessHandleGrantEntry>,
349 record: ProcessRecord,
350 }
351
352 impl RecordingProcessService {
353 fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
354 Self {
355 visible: visible.into_iter().collect(),
356 validate_calls: Mutex::new(Vec::new()),
357 cancel_calls: Mutex::new(Vec::new()),
358 visible_entries: Vec::new(),
359 record,
360 }
361 }
362
363 fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
364 self.visible_entries = process_ids
365 .into_iter()
366 .map(|process_id| {
367 (
368 ProcessHandleGrant {
369 session_id: "session-1".to_string(),
370 process_id: process_id.clone(),
371 descriptor: ProcessHandleDescriptor::new(
372 Some("test"),
373 Some(process_id.clone()),
374 ),
375 },
376 ProcessRecord::from_registration(ProcessRegistration::new(
377 process_id,
378 ProcessInput::External {
379 metadata: json!(null),
380 },
381 )),
382 )
383 })
384 .collect();
385 self
386 }
387
388 fn validate_calls(&self) -> Vec<Vec<String>> {
389 self.validate_calls.lock().expect("validate calls").clone()
390 }
391
392 fn cancel_calls(&self) -> Vec<String> {
393 self.cancel_calls.lock().expect("cancel calls").clone()
394 }
395 }
396
397 #[derive(Default)]
398 struct RecordingCancelAbility {
399 requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
400 }
401
402 impl RecordingCancelAbility {
403 fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
404 self.requests.lock().expect("cancel requests").clone()
405 }
406 }
407
408 #[async_trait::async_trait]
409 impl ProcessCancelAbility for RecordingCancelAbility {
410 async fn cancel(
411 &self,
412 processes: &dyn ProcessService,
413 request: ProcessCancelRequest<'_>,
414 ) -> Result<ProcessRecord, PluginError> {
415 self.requests.lock().expect("cancel requests").push((
416 request.process_id.to_string(),
417 request.source,
418 request.reason.clone(),
419 ));
420 DefaultProcessCancelAbility.cancel(processes, request).await
421 }
422 }
423
424 #[async_trait::async_trait]
425 impl ProcessService for RecordingProcessService {
426 async fn start(
427 &self,
428 _session_id: &str,
429 _registration: ProcessRegistration,
430 _options: ProcessStartOptions,
431 _scope: ProcessOpScope<'_>,
432 ) -> Result<ProcessRecord, PluginError> {
433 Err(PluginError::Session("start not implemented".to_string()))
434 }
435
436 async fn await_process(
437 &self,
438 _process_id: &str,
439 _scope: ProcessOpScope<'_>,
440 ) -> Result<ProcessAwaitOutput, PluginError> {
441 Err(PluginError::Session("await not implemented".to_string()))
442 }
443
444 async fn list_visible(
445 &self,
446 _session_id: &str,
447 _mode: ProcessListMode,
448 _scope: ProcessOpScope<'_>,
449 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
450 Ok(self.visible_entries.clone())
451 }
452
453 async fn validate_visible(
454 &self,
455 _session_id: &str,
456 process_ids: &[String],
457 _scope: ProcessOpScope<'_>,
458 ) -> Result<(), PluginError> {
459 self.validate_calls
460 .lock()
461 .expect("validate calls")
462 .push(process_ids.to_vec());
463 if let Some(missing) = process_ids
464 .iter()
465 .find(|process_id| !self.visible.contains(*process_id))
466 {
467 return Err(PluginError::Session(format!(
468 "process handle `{missing}` is not visible"
469 )));
470 }
471 Ok(())
472 }
473
474 async fn cancel(
475 &self,
476 _session_id: &str,
477 process_id: &str,
478 _scope: ProcessOpScope<'_>,
479 ) -> Result<ProcessRecord, PluginError> {
480 self.cancel_calls
481 .lock()
482 .expect("cancel calls")
483 .push(process_id.to_string());
484 let mut record = self.record.clone();
485 record.id = process_id.to_string();
486 Ok(record)
487 }
488
489 async fn signal(
490 &self,
491 _session_id: &str,
492 _process_id: &str,
493 _signal_id: String,
494 _payload: serde_json::Value,
495 _scope: ProcessOpScope<'_>,
496 ) -> Result<ProcessEvent, PluginError> {
497 Err(PluginError::Session("signal not implemented".to_string()))
498 }
499
500 async fn transfer(
501 &self,
502 _from_session_id: &str,
503 _to_session_id: &str,
504 _process_ids: Vec<String>,
505 _scope: ProcessOpScope<'_>,
506 ) -> Result<(), PluginError> {
507 Err(PluginError::Session("transfer not implemented".to_string()))
508 }
509
510 async fn cancel_unreferenced(
511 &self,
512 _session_id: &str,
513 _keep_process_ids: Vec<String>,
514 _scope: ProcessOpScope<'_>,
515 ) -> Result<Vec<ProcessRecord>, PluginError> {
516 Err(PluginError::Session(
517 "cancel unreferenced not implemented".to_string(),
518 ))
519 }
520 }
521
522 fn cancelled_record(process_id: &str) -> ProcessRecord {
523 let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
524 process_id,
525 ProcessInput::External {
526 metadata: json!(null),
527 },
528 ));
529 record.status = ProcessStatus::Cancelled {
530 await_output: ProcessAwaitOutput::Cancelled {
531 message: "cancelled".to_string(),
532 raw: None,
533 control: None,
534 },
535 };
536 record
537 }
538
539 fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
540 ProcessOpScope::new(
541 crate::ScopedEffectController::shared(
542 Arc::new(crate::InlineRuntimeEffectController),
543 crate::EffectScope::runtime_operation(id),
544 )
545 .expect("test effect scope"),
546 )
547 }
548
549 #[tokio::test]
550 async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
551 let service =
552 RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
553
554 let record = DefaultProcessCancelAbility
555 .cancel(
556 &service,
557 ProcessCancelRequest::new(
558 "session-1",
559 "process-1",
560 test_process_scope("cancel-visible"),
561 ProcessCancelSource::HostApi,
562 ),
563 )
564 .await
565 .expect("cancel process");
566
567 assert_eq!(record.status.label(), "cancelled");
568 assert_eq!(
569 service.validate_calls(),
570 vec![vec!["process-1".to_string()]]
571 );
572 assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
573 }
574
575 #[tokio::test]
576 async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
577 let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
578
579 let err = DefaultProcessCancelAbility
580 .cancel(
581 &service,
582 ProcessCancelRequest::new(
583 "session-1",
584 "p1",
585 test_process_scope("cancel-hidden"),
586 ProcessCancelSource::Tool,
587 ),
588 )
589 .await
590 .expect_err("hidden process should be rejected");
591
592 assert!(err.to_string().contains("not visible"), "{err}");
593 assert!(service.cancel_calls().is_empty());
594 }
595
596 #[tokio::test]
597 async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
598 let service = RecordingProcessService::new(
599 ["process-1".to_string(), "process-2".to_string()],
600 cancelled_record("template"),
601 )
602 .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
603 let ability = RecordingCancelAbility::default();
604
605 let summaries = ability
606 .cancel_all_visible(
607 &service,
608 ProcessCancelAllRequest::new(
609 "session-1",
610 test_process_scope("cancel-all"),
611 ProcessCancelSource::Tool,
612 )
613 .with_reason("requested by tool"),
614 )
615 .await
616 .expect("cancel all visible");
617
618 assert_eq!(
619 summaries
620 .iter()
621 .map(|summary| summary.process_id.as_str())
622 .collect::<Vec<_>>(),
623 vec!["process-1", "process-2"]
624 );
625 assert_eq!(
626 ability.requests(),
627 vec![
628 (
629 "process-1".to_string(),
630 ProcessCancelSource::Tool,
631 Some("requested by tool".to_string())
632 ),
633 (
634 "process-2".to_string(),
635 ProcessCancelSource::Tool,
636 Some("requested by tool".to_string())
637 )
638 ]
639 );
640 assert_eq!(
641 service.validate_calls(),
642 vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
643 );
644 assert_eq!(
645 service.cancel_calls(),
646 vec!["process-1".to_string(), "process-2".to_string()]
647 );
648 }
649}