nym_task/cancellation/token.rs
1// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::event::SentStatus;
5use std::future::Future;
6use tokio_util::sync::{
7 CancellationToken, DropGuard, WaitForCancellationFuture, WaitForCancellationFutureOwned,
8};
9use tracing::warn;
10
11/// A wrapped [CancellationToken](tokio_util::sync::CancellationToken) that is used for
12/// signalling and listening for cancellation requests.
13// We don't use CancellationToken in case we wanted to include additional fields/methods
14// down the line.
15#[derive(Debug, Clone, Default)]
16pub struct ShutdownToken {
17 inner: CancellationToken,
18}
19
20impl From<CancellationToken> for ShutdownToken {
21 fn from(inner: CancellationToken) -> Self {
22 ShutdownToken { inner }
23 }
24}
25
26impl ShutdownToken {
27 /// A drop in no-op replacement for `send_status_msg` for easier migration from [TaskClient](crate::TaskClient).
28 #[deprecated]
29 #[track_caller]
30 pub fn send_status_msg(&self, status: SentStatus) {
31 let caller = std::panic::Location::caller();
32 warn!("{caller} attempted to send {status} - there are no more listeners of those");
33 }
34
35 /// Creates a new ShutdownToken in the non-cancelled state.
36 pub fn new() -> Self {
37 ShutdownToken {
38 inner: CancellationToken::new(),
39 }
40 }
41
42 /// Creates a new ShutdownToken given a tokio `CancellationToken`.
43 pub fn new_from_tokio_token(cancellation_token: CancellationToken) -> Self {
44 ShutdownToken {
45 inner: cancellation_token,
46 }
47 }
48
49 /// Gets reference to the underlying [CancellationToken](tokio_util::sync::CancellationToken).
50 pub fn inner(&self) -> &CancellationToken {
51 &self.inner
52 }
53
54 /// Get an owned [CancellationToken](tokio_util::sync::CancellationToken) for public API use.
55 /// This is useful when you need to expose cancellation to SDK users without
56 /// exposing the internal ShutdownToken type.
57 pub fn to_cancellation_token(&self) -> CancellationToken {
58 self.inner.clone()
59 }
60
61 /// Creates a `ShutdownToken` which will get cancelled whenever the
62 /// current token gets cancelled. Unlike a cloned `ShutdownToken`,
63 /// cancelling a child token does not cancel the parent token.
64 ///
65 /// If the current token is already cancelled, the child token will get
66 /// returned in cancelled state.
67 pub fn child_token(&self) -> ShutdownToken {
68 ShutdownToken {
69 inner: self.inner.child_token(),
70 }
71 }
72
73 /// Cancel the underlying [CancellationToken](tokio_util::sync::CancellationToken) and all child tokens which had been
74 /// derived from it.
75 ///
76 /// This will wake up all tasks which are waiting for cancellation.
77 pub fn cancel(&self) {
78 self.inner.cancel();
79 }
80
81 /// Returns `true` if the underlying [CancellationToken](tokio_util::sync::CancellationToken) is cancelled.
82 pub fn is_cancelled(&self) -> bool {
83 self.inner.is_cancelled()
84 }
85
86 /// Returns a `Future` that gets fulfilled when cancellation is requested.
87 ///
88 /// The future will complete immediately if the token is already cancelled
89 /// when this method is called.
90 ///
91 /// # Cancel safety
92 ///
93 /// This method is cancel safe.
94 pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
95 self.inner.cancelled()
96 }
97
98 /// Returns a `Future` that gets fulfilled when cancellation is requested.
99 ///
100 /// The future will complete immediately if the token is already cancelled
101 /// when this method is called.
102 ///
103 /// The function takes self by value and returns a future that owns the
104 /// token.
105 ///
106 /// # Cancel safety
107 ///
108 /// This method is cancel safe.
109 pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
110 self.inner.cancelled_owned()
111 }
112
113 /// Creates a `ShutdownDropGuard` for this token.
114 ///
115 /// Returned guard will cancel this token (and all its children) on drop
116 /// unless disarmed.
117 pub fn drop_guard(self) -> ShutdownDropGuard {
118 ShutdownDropGuard {
119 inner: self.inner.drop_guard(),
120 }
121 }
122
123 /// Runs a future to completion and returns its result wrapped inside an `Option`
124 /// unless the `ShutdownToken` is cancelled. In that case the function returns
125 /// `None` and the future gets dropped.
126 ///
127 /// # Cancel safety
128 ///
129 /// This method is only cancel safe if `fut` is cancel safe.
130 pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
131 where
132 F: Future,
133 {
134 self.inner.run_until_cancelled(fut).await
135 }
136
137 /// Runs a future to completion and returns its result wrapped inside an `Option`
138 /// unless the `ShutdownToken` is cancelled. In that case the function returns
139 /// `None` and the future gets dropped.
140 ///
141 /// The function takes self by value and returns a future that owns the token.
142 ///
143 /// # Cancel safety
144 ///
145 /// This method is only cancel safe if `fut` is cancel safe.
146 pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
147 where
148 F: Future,
149 {
150 self.inner.run_until_cancelled_owned(fut).await
151 }
152}
153
154/// A wrapper for [DropGuard](tokio_util::sync::DropGuard) that wraps around a cancellation token
155/// which automatically cancels it on drop.
156/// It is created using `drop_guard` method on the `ShutdownToken`.
157pub struct ShutdownDropGuard {
158 inner: DropGuard,
159}
160
161impl ShutdownDropGuard {
162 /// Returns stored [ShutdownToken](ShutdownToken) and removes this drop guard instance
163 /// (i.e. it will no longer cancel token). Other guards for this token
164 /// are not affected.
165 pub fn disarm(self) -> ShutdownToken {
166 ShutdownToken {
167 inner: self.inner.disarm(),
168 }
169 }
170}