opendev_runtime/interrupt.rs
1//! Centralized interrupt/cancellation token for a single agent run.
2//!
3//! One token is created per user query execution. All components (LLM caller,
4//! tool executor, HTTP client, etc.) share the same token so that a single
5//! ESC press reliably cancels the entire operation.
6//!
7//! Uses `tokio_util::sync::CancellationToken` under the hood for true
8//! async-safe cancellation, plus an `AtomicBool` for synchronous polling.
9//!
10//! Ported from `opendev/core/runtime/interrupt_token.py`.
11
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use tokio_util::sync::CancellationToken;
15
16/// Async-safe cancellation token shared across all components of a run.
17///
18/// # Usage
19///
20/// ```rust
21/// # use opendev_runtime::InterruptToken;
22/// let token = InterruptToken::new();
23/// let child = token.clone();
24///
25/// // In the agent thread
26/// if child.is_requested() {
27/// // bail out
28/// }
29///
30/// // In the UI thread on ESC
31/// token.request();
32/// ```
33#[derive(Clone)]
34pub struct InterruptToken {
35 inner: Arc<InterruptInner>,
36}
37
38struct InterruptInner {
39 /// Synchronous flag for polling-based checks.
40 flag: AtomicBool,
41 /// Soft yield flag for backgrounding — does NOT cancel the CancellationToken.
42 background: AtomicBool,
43 /// Tokio cancellation token for async `.cancelled()` futures.
44 cancel: CancellationToken,
45}
46
47impl InterruptToken {
48 /// Create a new un-triggered token.
49 pub fn new() -> Self {
50 Self {
51 inner: Arc::new(InterruptInner {
52 flag: AtomicBool::new(false),
53 background: AtomicBool::new(false),
54 cancel: CancellationToken::new(),
55 }),
56 }
57 }
58
59 /// Signal that the user wants to cancel the current operation.
60 ///
61 /// This is cheap and idempotent — calling it multiple times is fine.
62 pub fn request(&self) {
63 self.inner.flag.store(true, Ordering::Release);
64 self.inner.cancel.cancel();
65 }
66
67 /// Force-interrupt the current operation.
68 ///
69 /// In the Rust implementation this is identical to `request()` since we rely
70 /// on cooperative cancellation via the `CancellationToken` rather than
71 /// injecting async exceptions into threads (which is a CPython-specific trick).
72 pub fn force_interrupt(&self) {
73 self.request();
74 }
75
76 /// Request that the current operation be moved to the background.
77 ///
78 /// This cancels the `CancellationToken` to immediately interrupt any
79 /// in-flight async operation (LLM streaming, tool execution), but does
80 /// NOT set the hard interrupt `flag`. The react loop distinguishes
81 /// background from hard interrupt by checking `is_background_requested()`.
82 pub fn request_background(&self) {
83 self.inner.background.store(true, Ordering::Release);
84 self.inner.cancel.cancel();
85 }
86
87 /// Check whether backgrounding has been requested.
88 pub fn is_background_requested(&self) -> bool {
89 self.inner.background.load(Ordering::Acquire)
90 }
91
92 /// Check whether cancellation has been requested.
93 ///
94 /// This is a cheap atomic load suitable for hot polling loops.
95 pub fn is_requested(&self) -> bool {
96 self.inner.flag.load(Ordering::Acquire)
97 }
98
99 /// Return an error if cancellation was requested.
100 pub fn throw_if_requested(&self) -> Result<(), InterruptedError> {
101 if self.is_requested() {
102 Err(InterruptedError)
103 } else {
104 Ok(())
105 }
106 }
107
108 /// Wait until cancellation is requested.
109 ///
110 /// This is the primary async integration point — select! against this
111 /// alongside your actual work future.
112 pub async fn cancelled(&self) {
113 self.inner.cancel.cancelled().await;
114 }
115
116 /// Get the underlying `tokio_util::sync::CancellationToken`.
117 ///
118 /// Useful when you need to pass a token to lower-level async code
119 /// or create child tokens.
120 pub fn cancellation_token(&self) -> &CancellationToken {
121 &self.inner.cancel
122 }
123
124 /// Create a child token that is cancelled when the parent is cancelled.
125 pub fn child_token(&self) -> CancellationToken {
126 self.inner.cancel.child_token()
127 }
128
129 /// Clear the cancellation signal (use with care — mainly for token reuse
130 /// across multiple agent runs).
131 pub fn reset(&self) {
132 self.inner.flag.store(false, Ordering::Release);
133 // Note: CancellationToken cannot be un-cancelled. For multi-run reuse
134 // the caller should create a new InterruptToken instead.
135 }
136
137 // ------------------------------------------------------------------
138 // TaskMonitor compatibility
139 // ------------------------------------------------------------------
140
141 /// Alias for `is_requested()` — TaskMonitor compatibility.
142 pub fn should_interrupt(&self) -> bool {
143 self.is_requested()
144 }
145
146 /// Alias for `request()` — TaskMonitor compatibility.
147 pub fn request_interrupt(&self) {
148 self.request();
149 }
150}
151
152impl Default for InterruptToken {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158impl std::fmt::Debug for InterruptToken {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("InterruptToken")
161 .field("requested", &self.is_requested())
162 .finish()
163 }
164}
165
166/// Error raised when an operation is interrupted by the user.
167#[derive(Debug, Clone, thiserror::Error)]
168#[error("Interrupted by user")]
169pub struct InterruptedError;
170
171#[cfg(test)]
172#[path = "interrupt_tests.rs"]
173mod tests;