1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5 ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessListMode,
6 ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions, ProcessStartRequest,
7};
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub enum ProcessCancelSource {
11 Tool,
12 Process,
13 HostApi,
14}
15
16#[derive(Clone)]
17pub struct ProcessCancelRequest<'scope> {
18 pub session_id: &'scope str,
19 pub process_id: &'scope str,
20 pub handle: Option<serde_json::Value>,
21 pub scope: ProcessOpScope<'scope>,
22 pub reason: Option<String>,
23 pub source: ProcessCancelSource,
24}
25
26impl<'scope> ProcessCancelRequest<'scope> {
27 pub fn new(
28 session_id: &'scope str,
29 process_id: &'scope str,
30 scope: ProcessOpScope<'scope>,
31 source: ProcessCancelSource,
32 ) -> Self {
33 Self {
34 session_id,
35 process_id,
36 handle: None,
37 scope,
38 reason: None,
39 source,
40 }
41 }
42
43 pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
44 self.handle = Some(handle);
45 self
46 }
47
48 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
49 self.reason = Some(reason.into());
50 self
51 }
52}
53
54#[derive(Clone)]
55pub struct ProcessCancelAllRequest<'scope> {
56 pub session_id: &'scope str,
57 pub scope: ProcessOpScope<'scope>,
58 pub source: ProcessCancelSource,
59 pub reason: Option<String>,
60}
61
62impl<'scope> ProcessCancelAllRequest<'scope> {
63 pub fn new(
64 session_id: &'scope str,
65 scope: ProcessOpScope<'scope>,
66 source: ProcessCancelSource,
67 ) -> Self {
68 Self {
69 session_id,
70 scope,
71 source,
72 reason: None,
73 }
74 }
75
76 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77 self.reason = Some(reason.into());
78 self
79 }
80}
81
82#[async_trait::async_trait]
83pub trait ProcessCancelAbility: Send + Sync {
84 async fn cancel(
85 &self,
86 processes: &dyn ProcessService,
87 request: ProcessCancelRequest<'_>,
88 ) -> Result<ProcessRecord, PluginError>;
89
90 async fn cancel_summary(
91 &self,
92 processes: &dyn ProcessService,
93 request: ProcessCancelRequest<'_>,
94 ) -> Result<ProcessCancelSummary, PluginError> {
95 self.cancel(processes, request)
96 .await
97 .map(ProcessCancelSummary::from_record)
98 }
99
100 async fn cancel_all_visible(
101 &self,
102 processes: &dyn ProcessService,
103 request: ProcessCancelAllRequest<'_>,
104 ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
105 let entries = processes
106 .list_visible(
107 request.session_id,
108 ProcessListMode::Live,
109 request.scope.clone(),
110 )
111 .await?;
112 let mut cancelled = Vec::new();
113 for (grant, record) in entries {
114 if record.is_terminal() {
115 continue;
116 }
117 let mut cancel_request = ProcessCancelRequest::new(
118 request.session_id,
119 &grant.process_id,
120 request.scope.clone(),
121 request.source,
122 );
123 if let Some(reason) = request.reason.clone() {
124 cancel_request = cancel_request.with_reason(reason);
125 }
126 cancelled.push(self.cancel_summary(processes, cancel_request).await?);
127 }
128 Ok(cancelled)
129 }
130}
131
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultProcessCancelAbility;
134
135#[async_trait::async_trait]
136impl ProcessCancelAbility for DefaultProcessCancelAbility {
137 async fn cancel(
138 &self,
139 processes: &dyn ProcessService,
140 request: ProcessCancelRequest<'_>,
141 ) -> Result<ProcessRecord, PluginError> {
142 let process_ids = [request.process_id.to_string()];
143 processes
144 .validate_visible(request.session_id, &process_ids, request.scope.clone())
145 .await?;
146 processes
147 .cancel(request.session_id, request.process_id, request.scope)
148 .await
149 }
150}
151
152#[async_trait::async_trait]
153pub trait ProcessService: Send + Sync {
154 async fn start_from_request(
155 &self,
156 session_id: &str,
157 request: ProcessStartRequest,
158 scope: ProcessOpScope<'_>,
159 ) -> Result<ProcessHandleSummary, PluginError> {
160 let _ = (session_id, request, scope);
161 Err(PluginError::Session(
162 "process start request composition is unavailable in this service".to_string(),
163 ))
164 }
165
166 async fn start(
167 &self,
168 session_id: &str,
169 registration: ProcessRegistration,
170 options: ProcessStartOptions,
171 scope: ProcessOpScope<'_>,
172 ) -> Result<ProcessRecord, PluginError>;
173
174 async fn complete_external(
179 &self,
180 session_id: &str,
181 process_id: &str,
182 await_output: ProcessAwaitOutput,
183 scope: ProcessOpScope<'_>,
184 ) -> Result<ProcessRecord, PluginError> {
185 let _ = (session_id, process_id, await_output, scope);
186 Err(PluginError::Session(
187 "external process completion is unavailable in this service".to_string(),
188 ))
189 }
190
191 async fn await_process(
192 &self,
193 process_id: &str,
194 scope: ProcessOpScope<'_>,
195 ) -> Result<ProcessAwaitOutput, PluginError>;
196
197 async fn list_visible(
198 &self,
199 session_id: &str,
200 mode: ProcessListMode,
201 scope: ProcessOpScope<'_>,
202 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
203
204 async fn validate_visible(
205 &self,
206 session_id: &str,
207 process_ids: &[String],
208 scope: ProcessOpScope<'_>,
209 ) -> Result<(), PluginError>;
210
211 async fn cancel(
212 &self,
213 session_id: &str,
214 process_id: &str,
215 scope: ProcessOpScope<'_>,
216 ) -> Result<ProcessRecord, PluginError>;
217
218 async fn signal(
219 &self,
220 session_id: &str,
221 process_id: &str,
222 signal_name: String,
223 signal_id: String,
224 payload: serde_json::Value,
225 scope: ProcessOpScope<'_>,
226 ) -> Result<ProcessEvent, PluginError>;
227
228 async fn transfer(
229 &self,
230 from_session_id: &str,
231 to_session_id: &str,
232 process_ids: Vec<String>,
233 scope: ProcessOpScope<'_>,
234 ) -> Result<(), PluginError>;
235
236 async fn cancel_unreferenced(
237 &self,
238 session_id: &str,
239 keep_process_ids: Vec<String>,
240 scope: ProcessOpScope<'_>,
241 ) -> Result<Vec<ProcessRecord>, PluginError>;
242}
243
244pub struct UnavailableProcessService;
245
246#[async_trait::async_trait]
247impl ProcessService for UnavailableProcessService {
248 async fn start(
249 &self,
250 _session_id: &str,
251 _registration: ProcessRegistration,
252 _options: ProcessStartOptions,
253 _scope: ProcessOpScope<'_>,
254 ) -> Result<ProcessRecord, PluginError> {
255 Err(PluginError::Session(
256 "processes are unavailable in this runtime".to_string(),
257 ))
258 }
259
260 async fn await_process(
261 &self,
262 _process_id: &str,
263 _scope: ProcessOpScope<'_>,
264 ) -> Result<ProcessAwaitOutput, PluginError> {
265 Err(PluginError::Session(
266 "process awaiting is unavailable in this runtime".to_string(),
267 ))
268 }
269
270 async fn list_visible(
271 &self,
272 _session_id: &str,
273 _mode: ProcessListMode,
274 _scope: ProcessOpScope<'_>,
275 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
276 Err(PluginError::Session(
277 "process registry is unavailable in this runtime".to_string(),
278 ))
279 }
280
281 async fn validate_visible(
282 &self,
283 _session_id: &str,
284 _process_ids: &[String],
285 _scope: ProcessOpScope<'_>,
286 ) -> Result<(), PluginError> {
287 Err(PluginError::Session(
288 "process handle validation is unavailable in this runtime".to_string(),
289 ))
290 }
291
292 async fn cancel(
293 &self,
294 _session_id: &str,
295 _process_id: &str,
296 _scope: ProcessOpScope<'_>,
297 ) -> Result<ProcessRecord, PluginError> {
298 Err(PluginError::Session(
299 "process registry is unavailable in this runtime".to_string(),
300 ))
301 }
302
303 async fn signal(
304 &self,
305 _session_id: &str,
306 _process_id: &str,
307 _signal_name: String,
308 _signal_id: String,
309 _payload: serde_json::Value,
310 _scope: ProcessOpScope<'_>,
311 ) -> Result<ProcessEvent, PluginError> {
312 Err(PluginError::Session(
313 "process signalling is unavailable in this runtime".to_string(),
314 ))
315 }
316
317 async fn transfer(
318 &self,
319 _from_session_id: &str,
320 _to_session_id: &str,
321 process_ids: Vec<String>,
322 _scope: ProcessOpScope<'_>,
323 ) -> Result<(), PluginError> {
324 if process_ids.is_empty() {
325 return Ok(());
326 }
327 Err(PluginError::Session(
328 "process handle transfer is unavailable in this runtime".to_string(),
329 ))
330 }
331
332 async fn cancel_unreferenced(
333 &self,
334 _session_id: &str,
335 _keep_process_ids: Vec<String>,
336 _scope: ProcessOpScope<'_>,
337 ) -> Result<Vec<ProcessRecord>, PluginError> {
338 Err(PluginError::Session(
339 "process handle cleanup is unavailable in this runtime".to_string(),
340 ))
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::collections::HashSet;
347 use std::sync::{Arc, Mutex};
348
349 use serde_json::json;
350
351 use super::*;
352 use crate::{
353 ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
354 ProcessInput, ProcessProvenance, ProcessRegistration, ProcessStatus,
355 };
356
357 struct RecordingProcessService {
358 visible: HashSet<String>,
359 validate_calls: Mutex<Vec<Vec<String>>>,
360 cancel_calls: Mutex<Vec<String>>,
361 visible_entries: Vec<ProcessHandleGrantEntry>,
362 record: ProcessRecord,
363 }
364
365 impl RecordingProcessService {
366 fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
367 Self {
368 visible: visible.into_iter().collect(),
369 validate_calls: Mutex::new(Vec::new()),
370 cancel_calls: Mutex::new(Vec::new()),
371 visible_entries: Vec::new(),
372 record,
373 }
374 }
375
376 fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
377 self.visible_entries = process_ids
378 .into_iter()
379 .map(|process_id| {
380 (
381 ProcessHandleGrant {
382 session_id: "session-1".to_string(),
383 process_id: process_id.clone(),
384 descriptor: ProcessHandleDescriptor::new(
385 Some("test"),
386 Some(process_id.clone()),
387 ),
388 },
389 ProcessRecord::from_registration(ProcessRegistration::new(
390 process_id,
391 ProcessInput::External {
392 metadata: json!(null),
393 },
394 crate::RecoveryDisposition::ExternallyOwned,
395 ProcessProvenance::host(),
396 )),
397 )
398 })
399 .collect();
400 self
401 }
402
403 fn validate_calls(&self) -> Vec<Vec<String>> {
404 self.validate_calls.lock().expect("validate calls").clone()
405 }
406
407 fn cancel_calls(&self) -> Vec<String> {
408 self.cancel_calls.lock().expect("cancel calls").clone()
409 }
410 }
411
412 #[derive(Default)]
413 struct RecordingCancelAbility {
414 requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
415 }
416
417 impl RecordingCancelAbility {
418 fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
419 self.requests.lock().expect("cancel requests").clone()
420 }
421 }
422
423 #[async_trait::async_trait]
424 impl ProcessCancelAbility for RecordingCancelAbility {
425 async fn cancel(
426 &self,
427 processes: &dyn ProcessService,
428 request: ProcessCancelRequest<'_>,
429 ) -> Result<ProcessRecord, PluginError> {
430 self.requests.lock().expect("cancel requests").push((
431 request.process_id.to_string(),
432 request.source,
433 request.reason.clone(),
434 ));
435 DefaultProcessCancelAbility.cancel(processes, request).await
436 }
437 }
438
439 #[async_trait::async_trait]
440 impl ProcessService for RecordingProcessService {
441 async fn start(
442 &self,
443 _session_id: &str,
444 _registration: ProcessRegistration,
445 _options: ProcessStartOptions,
446 _scope: ProcessOpScope<'_>,
447 ) -> Result<ProcessRecord, PluginError> {
448 Err(PluginError::Session("start not implemented".to_string()))
449 }
450
451 async fn await_process(
452 &self,
453 _process_id: &str,
454 _scope: ProcessOpScope<'_>,
455 ) -> Result<ProcessAwaitOutput, PluginError> {
456 Err(PluginError::Session("await not implemented".to_string()))
457 }
458
459 async fn list_visible(
460 &self,
461 _session_id: &str,
462 _mode: ProcessListMode,
463 _scope: ProcessOpScope<'_>,
464 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
465 Ok(self.visible_entries.clone())
466 }
467
468 async fn validate_visible(
469 &self,
470 _session_id: &str,
471 process_ids: &[String],
472 _scope: ProcessOpScope<'_>,
473 ) -> Result<(), PluginError> {
474 self.validate_calls
475 .lock()
476 .expect("validate calls")
477 .push(process_ids.to_vec());
478 if let Some(missing) = process_ids
479 .iter()
480 .find(|process_id| !self.visible.contains(*process_id))
481 {
482 return Err(PluginError::Session(format!(
483 "process handle `{missing}` is not visible"
484 )));
485 }
486 Ok(())
487 }
488
489 async fn cancel(
490 &self,
491 _session_id: &str,
492 process_id: &str,
493 _scope: ProcessOpScope<'_>,
494 ) -> Result<ProcessRecord, PluginError> {
495 self.cancel_calls
496 .lock()
497 .expect("cancel calls")
498 .push(process_id.to_string());
499 let mut record = self.record.clone();
500 record.id = process_id.to_string();
501 Ok(record)
502 }
503
504 async fn signal(
505 &self,
506 _session_id: &str,
507 _process_id: &str,
508 _signal_name: String,
509 _signal_id: String,
510 _payload: serde_json::Value,
511 _scope: ProcessOpScope<'_>,
512 ) -> Result<ProcessEvent, PluginError> {
513 Err(PluginError::Session("signal not implemented".to_string()))
514 }
515
516 async fn transfer(
517 &self,
518 _from_session_id: &str,
519 _to_session_id: &str,
520 _process_ids: Vec<String>,
521 _scope: ProcessOpScope<'_>,
522 ) -> Result<(), PluginError> {
523 Err(PluginError::Session("transfer not implemented".to_string()))
524 }
525
526 async fn cancel_unreferenced(
527 &self,
528 _session_id: &str,
529 _keep_process_ids: Vec<String>,
530 _scope: ProcessOpScope<'_>,
531 ) -> Result<Vec<ProcessRecord>, PluginError> {
532 Err(PluginError::Session(
533 "cancel unreferenced not implemented".to_string(),
534 ))
535 }
536 }
537
538 fn cancelled_record(process_id: &str) -> ProcessRecord {
539 let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
540 process_id,
541 ProcessInput::External {
542 metadata: json!(null),
543 },
544 crate::RecoveryDisposition::ExternallyOwned,
545 ProcessProvenance::host(),
546 ));
547 record.status = ProcessStatus::Cancelled {
548 await_output: ProcessAwaitOutput::Cancelled {
549 message: "cancelled".to_string(),
550 raw: None,
551 control: None,
552 },
553 };
554 record
555 }
556
557 fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
558 ProcessOpScope::new(
559 crate::ScopedEffectController::shared(
560 Arc::new(crate::InlineRuntimeEffectController),
561 crate::ExecutionScope::runtime_operation(id),
562 )
563 .expect("test execution scope"),
564 )
565 }
566
567 #[tokio::test]
568 async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
569 let service =
570 RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
571
572 let record = DefaultProcessCancelAbility
573 .cancel(
574 &service,
575 ProcessCancelRequest::new(
576 "session-1",
577 "process-1",
578 test_process_scope("cancel-visible"),
579 ProcessCancelSource::HostApi,
580 ),
581 )
582 .await
583 .expect("cancel process");
584
585 assert_eq!(record.status.label(), "cancelled");
586 assert_eq!(
587 service.validate_calls(),
588 vec![vec!["process-1".to_string()]]
589 );
590 assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
591 }
592
593 #[tokio::test]
594 async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
595 let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
596
597 let err = DefaultProcessCancelAbility
598 .cancel(
599 &service,
600 ProcessCancelRequest::new(
601 "session-1",
602 "p1",
603 test_process_scope("cancel-hidden"),
604 ProcessCancelSource::Tool,
605 ),
606 )
607 .await
608 .expect_err("hidden process should be rejected");
609
610 assert!(err.to_string().contains("not visible"), "{err}");
611 assert!(service.cancel_calls().is_empty());
612 }
613
614 #[tokio::test]
615 async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
616 let service = RecordingProcessService::new(
617 ["process-1".to_string(), "process-2".to_string()],
618 cancelled_record("template"),
619 )
620 .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
621 let ability = RecordingCancelAbility::default();
622
623 let summaries = ability
624 .cancel_all_visible(
625 &service,
626 ProcessCancelAllRequest::new(
627 "session-1",
628 test_process_scope("cancel-all"),
629 ProcessCancelSource::Tool,
630 )
631 .with_reason("requested by tool"),
632 )
633 .await
634 .expect("cancel all visible");
635
636 assert_eq!(
637 summaries
638 .iter()
639 .map(|summary| summary.process_id.as_str())
640 .collect::<Vec<_>>(),
641 vec!["process-1", "process-2"]
642 );
643 assert_eq!(
644 ability.requests(),
645 vec![
646 (
647 "process-1".to_string(),
648 ProcessCancelSource::Tool,
649 Some("requested by tool".to_string())
650 ),
651 (
652 "process-2".to_string(),
653 ProcessCancelSource::Tool,
654 Some("requested by tool".to_string())
655 )
656 ]
657 );
658 assert_eq!(
659 service.validate_calls(),
660 vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
661 );
662 assert_eq!(
663 service.cancel_calls(),
664 vec!["process-1".to_string(), "process-2".to_string()]
665 );
666 }
667}