Skip to main content

opendev_runtime/
interrupt.rs

1//! Centralized interrupt/cancellation token for a single agent run.
2//!
3//! One token is created per user query execution. All components (LLM caller,
4//! tool executor, HTTP client, etc.) share the same token so that a single
5//! ESC press reliably cancels the entire operation.
6//!
7//! Uses `tokio_util::sync::CancellationToken` under the hood for true
8//! async-safe cancellation, plus an `AtomicBool` for synchronous polling.
9//!
10//! Ported from `opendev/core/runtime/interrupt_token.py`.
11
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use tokio_util::sync::CancellationToken;
15
16/// Async-safe cancellation token shared across all components of a run.
17///
18/// # Usage
19///
20/// ```rust
21/// # use opendev_runtime::InterruptToken;
22/// let token = InterruptToken::new();
23/// let child = token.clone();
24///
25/// // In the agent thread
26/// if child.is_requested() {
27///     // bail out
28/// }
29///
30/// // In the UI thread on ESC
31/// token.request();
32/// ```
33#[derive(Clone)]
34pub struct InterruptToken {
35    inner: Arc<InterruptInner>,
36}
37
38struct InterruptInner {
39    /// Synchronous flag for polling-based checks.
40    flag: AtomicBool,
41    /// Soft yield flag for backgrounding — does NOT cancel the CancellationToken.
42    background: AtomicBool,
43    /// Tokio cancellation token for async `.cancelled()` futures.
44    cancel: CancellationToken,
45}
46
47impl InterruptToken {
48    /// Create a new un-triggered token.
49    pub fn new() -> Self {
50        Self {
51            inner: Arc::new(InterruptInner {
52                flag: AtomicBool::new(false),
53                background: AtomicBool::new(false),
54                cancel: CancellationToken::new(),
55            }),
56        }
57    }
58
59    /// Signal that the user wants to cancel the current operation.
60    ///
61    /// This is cheap and idempotent — calling it multiple times is fine.
62    pub fn request(&self) {
63        self.inner.flag.store(true, Ordering::Release);
64        self.inner.cancel.cancel();
65    }
66
67    /// Force-interrupt the current operation.
68    ///
69    /// In the Rust implementation this is identical to `request()` since we rely
70    /// on cooperative cancellation via the `CancellationToken` rather than
71    /// injecting async exceptions into threads (which is a CPython-specific trick).
72    pub fn force_interrupt(&self) {
73        self.request();
74    }
75
76    /// Request that the current operation be moved to the background.
77    ///
78    /// This cancels the `CancellationToken` to immediately interrupt any
79    /// in-flight async operation (LLM streaming, tool execution), but does
80    /// NOT set the hard interrupt `flag`. The react loop distinguishes
81    /// background from hard interrupt by checking `is_background_requested()`.
82    pub fn request_background(&self) {
83        self.inner.background.store(true, Ordering::Release);
84        self.inner.cancel.cancel();
85    }
86
87    /// Check whether backgrounding has been requested.
88    pub fn is_background_requested(&self) -> bool {
89        self.inner.background.load(Ordering::Acquire)
90    }
91
92    /// Check whether cancellation has been requested.
93    ///
94    /// This is a cheap atomic load suitable for hot polling loops.
95    pub fn is_requested(&self) -> bool {
96        self.inner.flag.load(Ordering::Acquire)
97    }
98
99    /// Return an error if cancellation was requested.
100    pub fn throw_if_requested(&self) -> Result<(), InterruptedError> {
101        if self.is_requested() {
102            Err(InterruptedError)
103        } else {
104            Ok(())
105        }
106    }
107
108    /// Wait until cancellation is requested.
109    ///
110    /// This is the primary async integration point — select! against this
111    /// alongside your actual work future.
112    pub async fn cancelled(&self) {
113        self.inner.cancel.cancelled().await;
114    }
115
116    /// Get the underlying `tokio_util::sync::CancellationToken`.
117    ///
118    /// Useful when you need to pass a token to lower-level async code
119    /// or create child tokens.
120    pub fn cancellation_token(&self) -> &CancellationToken {
121        &self.inner.cancel
122    }
123
124    /// Create a child token that is cancelled when the parent is cancelled.
125    pub fn child_token(&self) -> CancellationToken {
126        self.inner.cancel.child_token()
127    }
128
129    /// Clear the cancellation signal (use with care — mainly for token reuse
130    /// across multiple agent runs).
131    pub fn reset(&self) {
132        self.inner.flag.store(false, Ordering::Release);
133        // Note: CancellationToken cannot be un-cancelled. For multi-run reuse
134        // the caller should create a new InterruptToken instead.
135    }
136
137    // ------------------------------------------------------------------
138    // TaskMonitor compatibility
139    // ------------------------------------------------------------------
140
141    /// Alias for `is_requested()` — TaskMonitor compatibility.
142    pub fn should_interrupt(&self) -> bool {
143        self.is_requested()
144    }
145
146    /// Alias for `request()` — TaskMonitor compatibility.
147    pub fn request_interrupt(&self) {
148        self.request();
149    }
150}
151
152impl Default for InterruptToken {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158impl std::fmt::Debug for InterruptToken {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("InterruptToken")
161            .field("requested", &self.is_requested())
162            .finish()
163    }
164}
165
166/// Error raised when an operation is interrupted by the user.
167#[derive(Debug, Clone, thiserror::Error)]
168#[error("Interrupted by user")]
169pub struct InterruptedError;
170
171#[cfg(test)]
172#[path = "interrupt_tests.rs"]
173mod tests;