use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use pyo3::prelude::*;
#[pyclass(name = "CancellationToken", skip_from_py_object)]
#[derive(Clone)]
pub struct PyCancellationToken {
pub(crate) cancelled: Arc<AtomicBool>,
}
#[pymethods]
impl PyCancellationToken {
#[new]
pub fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
pub fn reset(&self) {
self.cancelled.store(false, Ordering::Relaxed);
}
fn __repr__(&self) -> String {
format!(
"CancellationToken(cancelled={})",
self.cancelled.load(Ordering::Relaxed)
)
}
}
impl Default for PyCancellationToken {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initial_state_not_cancelled() {
let t = PyCancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn test_cancel_sets_flag() {
let t = PyCancellationToken::new();
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn test_reset_clears_flag() {
let t = PyCancellationToken::new();
t.cancel();
t.reset();
assert!(!t.is_cancelled());
}
#[test]
fn test_clone_shares_state() {
let t = PyCancellationToken::new();
let t2 = t.clone();
t.cancel();
assert!(t2.is_cancelled());
}
}