nym-task 1.21.1

Task handling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0

use std::{
    error::Error,
    sync::atomic::{AtomicBool, Ordering},
    time::Duration,
};

use futures::{FutureExt, SinkExt, StreamExt, future::pending};
use log::{Level, log};
use tokio::sync::{
    mpsc,
    watch::{self, error::SendError},
};

use crate::event::{SentStatus, StatusReceiver, StatusSender, TaskStatus};

#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, timeout};

#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, timeout};

const DEFAULT_SHUTDOWN_TIMER_SECS: u64 = 5;

pub(crate) type SentError = Box<dyn Error + Send + Sync>;
type ErrorSender = mpsc::UnboundedSender<SentError>;
type ErrorReceiver = mpsc::UnboundedReceiver<SentError>;

fn try_recover_name(name: &Option<String>) -> String {
    if let Some(name) = name {
        name.clone()
    } else {
        "unknown".to_string()
    }
}

#[derive(thiserror::Error, Debug)]
enum TaskError {
    #[error("Task '{}' halted unexpectedly", try_recover_name(.shutdown_name))]
    UnexpectedHalt { shutdown_name: Option<String> },
}

/// Listens to status and error messages from tasks, as well as notifying them to gracefully
/// shutdown. Keeps track of if task stop unexpectedly, such as in a panic.
#[deprecated(note = "use ShutdownManager instead")]
#[derive(Debug)]
pub struct TaskManager {
    // optional name assigned to the task manager that all subscribed task clients will inherit
    name: Option<String>,

    // These channels have the dual purpose of signalling it's time to shutdown, but also to keep
    // track of which tasks we are still waiting for.
    notify_tx: watch::Sender<()>,
    notify_rx: Option<watch::Receiver<()>>,
    #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
    shutdown_timer_secs: u64,

    // If any task failed, it needs to report separately
    task_return_error_tx: ErrorSender,
    task_return_error_rx: Option<ErrorReceiver>,

    // Also signal when the notifier is dropped, in case the task exits unexpectedly.
    // Why are we not reusing the return error channel? Well, let me tell you kids, it's because I
    // didn't manage to reliably get the explicitly sent error (and not the error sent during drop)
    task_drop_tx: ErrorSender,
    task_drop_rx: Option<ErrorReceiver>,

    // A task might also send non-fatal errors (effectively, warnings) while running that is not
    // the result of exiting.
    task_status_tx: StatusSender,
    task_status_rx: Option<StatusReceiver>,
}

#[allow(deprecated)]
impl Default for TaskManager {
    fn default() -> Self {
        let (notify_tx, notify_rx) = watch::channel(());
        let (task_halt_tx, task_halt_rx) = mpsc::unbounded_channel();
        let (task_drop_tx, task_drop_rx) = mpsc::unbounded_channel();
        // The status channel is bounded (unlike the others), since it's not always the case that
        // there is a listener.
        let (task_status_tx, task_status_rx) = futures::channel::mpsc::channel(128);
        Self {
            name: None,
            notify_tx,
            notify_rx: Some(notify_rx),
            shutdown_timer_secs: DEFAULT_SHUTDOWN_TIMER_SECS,
            task_return_error_tx: task_halt_tx,
            task_return_error_rx: Some(task_halt_rx),
            task_drop_tx,
            task_drop_rx: Some(task_drop_rx),
            task_status_tx,
            task_status_rx: Some(task_status_rx),
        }
    }
}

#[allow(deprecated)]
#[allow(clippy::expect_used)]
impl TaskManager {
    pub fn new(shutdown_timer_secs: u64) -> Self {
        Self {
            shutdown_timer_secs,
            ..Default::default()
        }
    }

    #[must_use]
    pub fn named<S: Into<String>>(mut self, name: S) -> Self {
        self.name = Some(name.into());
        self
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub async fn catch_interrupt(&mut self) -> Result<(), SentError> {
        let res = crate::wait_for_signal_and_error(self).await;

        log::info!("Sending shutdown");
        self.signal_shutdown().ok();

        log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
        self.wait_for_shutdown().await;

        res
    }

    pub fn subscribe(&self) -> TaskClient {
        let task_client = TaskClient::new(
            self.notify_rx
                .as_ref()
                .expect("Unable to subscribe to shutdown notifier that is already shutdown")
                .clone(),
            self.task_return_error_tx.clone(),
            self.task_drop_tx.clone(),
            self.task_status_tx.clone(),
        );

        if let Some(name) = &self.name {
            task_client.named(format!("{name}-child"))
        } else {
            task_client
        }
    }

    pub fn subscribe_named<S: Into<String>>(&self, suffix: S) -> TaskClient {
        let task_client = self.subscribe();
        let suffix = suffix.into();
        let child_name = if let Some(base) = &self.name {
            format!("{base}-{suffix}")
        } else {
            format!("unknown-{suffix}")
        };
        task_client.named(child_name)
    }

    pub fn signal_shutdown(&self) -> Result<(), SendError<()>> {
        self.notify_tx.send(())
    }

    pub async fn start_status_listener(
        &mut self,
        mut sender: StatusSender,
        start_status: TaskStatus,
    ) {
        // Announce that we are operational. This means that in the application where this is used,
        // everything is up and running and ready to go.
        if let Err(msg) = sender.send(Box::new(start_status)).await {
            log::error!("Error sending status message: {msg}");
        };

        if let Some(mut task_status_rx) = self.task_status_rx.take() {
            log::info!("Starting status message listener");
            crate::spawn::spawn_future(async move {
                loop {
                    if let Some(msg) = task_status_rx.next().await {
                        log::trace!("Got msg: {msg}");
                        if let Err(msg) = sender.send(msg).await {
                            log::error!("Error sending status message: {msg}");
                        }
                    } else {
                        log::trace!("Stopping since channel closed");
                        break;
                    }
                }
                log::debug!("Status listener: Exiting");
            });
        }
    }

    // used for compatibility with the ShutdownManager
    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn task_return_error_rx(&mut self) -> ErrorReceiver {
        self.task_return_error_rx
            .take()
            .expect("unable to get error channel: attempt to wait twice?")
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn task_drop_rx(&mut self) -> ErrorReceiver {
        self.task_drop_rx
            .take()
            .expect("unable to get task drop channel: attempt to wait twice?")
    }

    pub async fn wait_for_error(&mut self) -> Option<SentError> {
        let mut error_rx = self
            .task_return_error_rx
            .take()
            .expect("Unable to wait for error: attempt to wait twice?");
        let mut drop_rx = self
            .task_drop_rx
            .take()
            .expect("Unable to wait for error: attempt to wait twice?");

        // During an error we are likely like to be swamped with drop notifications as well, this
        // is a crude way to give priority to real errors (if there are any).
        let drop_rx = drop_rx.recv().then(|msg| async move {
            sleep(Duration::from_millis(50)).await;
            msg
        });

        tokio::select! {
            msg = error_rx.recv() => msg,
            msg = drop_rx => msg
        }
    }

    pub async fn wait_for_graceful_shutdown(&mut self) {
        if let Some(notify_rx) = self.notify_rx.take() {
            drop(notify_rx);
        }
        self.notify_tx.closed().await
    }

    pub async fn wait_for_shutdown(&mut self) {
        log::debug!("Waiting for shutdown");
        if let Some(notify_rx) = self.notify_rx.take() {
            drop(notify_rx);
        }

        #[cfg(not(target_arch = "wasm32"))]
        let interrupt_future = tokio::signal::ctrl_c();

        #[cfg(target_arch = "wasm32")]
        let interrupt_future = futures::future::pending::<()>();

        let wait_future = sleep(Duration::from_secs(self.shutdown_timer_secs));

        tokio::select! {
            _ = self.notify_tx.closed() => {
                log::info!("All registered tasks succesfully shutdown");
            },
            _ = interrupt_future => {
                log::info!("Forcing shutdown");
            }
            _ = wait_future => {
                log::info!("Timeout reached, forcing shutdown");
            },
        }
    }
}

/// Listen for shutdown notifications, and can send error and status messages back to the
/// `TaskManager`
#[derive(Debug)]
#[deprecated(note = "use ShutdownToken instead")]
pub struct TaskClient {
    // optional name assigned to the shutdown handle
    name: Option<String>,

    // If a shutdown notification has been registered
    // the reason for having an atomic here is to be able to cheat and modify that value whilst
    // holding an immutable reference to the `TaskClient`.
    // note: using `Relaxed` ordering everywhere is fine since it's not shared between threads
    shutdown: AtomicBool,

    // Listen for shutdown notifications, as well as a mechanism to report back that we have
    // finished (the receiver is closed).
    notify: watch::Receiver<()>,

    // Send back error if we stopped
    return_error: ErrorSender,

    // Also notify if we dropped without shutdown being registered
    drop_error: ErrorSender,

    // Send non-exit messages
    status_msg: StatusSender,

    // The current operating mode
    mode: ClientOperatingMode,
}

#[allow(deprecated)]
impl Clone for TaskClient {
    fn clone(&self) -> Self {
        // make sure to not accidentally overflow the stack if we keep cloning the handle
        let name = if let Some(name) = &self.name {
            if name != Self::OVERFLOW_NAME && name.len() < Self::MAX_NAME_LENGTH {
                Some(format!("{name}-child"))
            } else {
                Some(Self::OVERFLOW_NAME.to_string())
            }
        } else {
            None
        };

        log::debug!("Cloned task client: {name:?}");

        TaskClient {
            name,
            shutdown: AtomicBool::new(self.shutdown.load(Ordering::Relaxed)),
            notify: self.notify.clone(),
            return_error: self.return_error.clone(),
            drop_error: self.drop_error.clone(),
            status_msg: self.status_msg.clone(),
            mode: self.mode.clone(),
        }
    }
}

#[allow(deprecated)]
impl TaskClient {
    const MAX_NAME_LENGTH: usize = 128;
    const OVERFLOW_NAME: &'static str = "reached maximum TaskClient children name depth";

    const SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT: Duration = Duration::from_secs(10);

    fn new(
        notify: watch::Receiver<()>,
        return_error: ErrorSender,
        drop_error: ErrorSender,
        status_msg: StatusSender,
    ) -> TaskClient {
        TaskClient {
            name: None,
            shutdown: AtomicBool::new(false),
            notify,
            return_error,
            drop_error,
            status_msg,
            mode: ClientOperatingMode::Listening,
        }
    }

    // TODO: not convinced about the name...
    pub fn fork<S: Into<String>>(&self, child_suffix: S) -> Self {
        let mut child = self.clone();
        let suffix = child_suffix.into();
        let child_name = if let Some(base) = &self.name {
            format!("{base}-{suffix}")
        } else {
            format!("unknown-{suffix}")
        };

        log::debug!("Forked task client: {child_name}");
        child.name = Some(child_name);
        child
    }

    // just a convenience wrapper for including the shutdown name when logging
    // I really didn't want to create macros for that... because that seemed like an overkill.
    // but I guess it would have resolved needing to call `format!` for additional msg arguments
    fn log<S: Into<String>>(&self, level: Level, msg: S) {
        let msg = msg.into();

        let target = &if let Some(name) = &self.name {
            format!("TaskClient-{name}")
        } else {
            "unnamed-TaskClient".to_string()
        };

        log!(target: target, level, "{}", format_args!("[{target}] {msg}"))
    }

    #[must_use]
    pub fn named<S: Into<String>>(mut self, name: S) -> Self {
        self.name = Some(name.into());
        self
    }

    #[must_use]
    pub fn with_suffix<S: Into<String>>(self, suffix: S) -> Self {
        let suffix = suffix.into();
        let name = if let Some(base) = &self.name {
            format!("{base}-{suffix}")
        } else {
            format!("unknown-{suffix}")
        };
        log::debug!("Renamed task client: {name}");
        self.named(name)
    }

    // Create a dummy that will never report that we should shutdown.
    pub fn dummy() -> TaskClient {
        let (_notify_tx, notify_rx) = watch::channel(());
        let (task_halt_tx, _task_halt_rx) = mpsc::unbounded_channel();
        let (task_drop_tx, _task_drop_rx) = mpsc::unbounded_channel();
        let (task_status_tx, _task_status_rx) = futures::channel::mpsc::channel(128);
        TaskClient {
            name: None,
            shutdown: AtomicBool::new(false),
            notify: notify_rx,
            return_error: task_halt_tx,
            drop_error: task_drop_tx,
            status_msg: task_status_tx,
            mode: ClientOperatingMode::Dummy,
        }
    }

    pub fn is_dummy(&self) -> bool {
        self.mode.is_dummy()
    }

    pub fn is_shutdown(&self) -> bool {
        if self.mode.is_dummy() {
            false
        } else {
            self.shutdown.load(Ordering::Relaxed)
        }
    }

    pub async fn recv(&mut self) {
        if self.mode.is_dummy() {
            return pending().await;
        }
        if self.shutdown.load(Ordering::Relaxed) {
            return;
        }
        let _ = self.notify.changed().await;
        self.shutdown.store(true, Ordering::Relaxed);
    }

    pub async fn recv_with_delay(&mut self) {
        self.recv()
            .then(|msg| async move {
                sleep(Duration::from_secs(2)).await;
                msg
            })
            .await
    }

    // legacy code
    #[allow(clippy::panic)]
    pub async fn recv_timeout(&mut self) {
        if self.mode.is_dummy() {
            return pending().await;
        }

        if let Err(timeout) = timeout(
            Self::SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT,
            self.recv(),
        )
        .await
        {
            self.log(Level::Error, "Task stopped without shutdown called");
            panic!("{:?}: {timeout}", self.name)
        }
    }

    pub fn is_shutdown_poll(&self) -> bool {
        if self.mode.is_dummy() {
            return false;
        }
        if self.shutdown.load(Ordering::Relaxed) {
            return true;
        }
        match self.notify.has_changed() {
            Ok(has_changed) => {
                if has_changed {
                    self.shutdown.store(true, Ordering::Relaxed);
                }
                has_changed
            }
            Err(err) => {
                self.log(Level::Error, format!("Polling shutdown failed: {err}"));
                self.log(Level::Error, "Assuming this means we should shutdown...");

                true
            }
        }
    }

    // This listener should to *not* notify the ShutdownNotifier to shutdown when dropped. For
    // example when we clone the listener for a task handling connections, we often want to drop
    // without signal failure.
    pub fn disarm(&mut self) {
        self.mode.set_should_not_signal_on_drop();
    }

    pub fn rearm(&mut self) {
        self.mode.set_should_signal_on_drop();
    }

    pub fn send_we_stopped(&mut self, err: SentError) {
        if self.mode.is_dummy() {
            return;
        }

        self.log(Level::Trace, format!("Notifying we stopped: {err}"));

        if self.return_error.send(err).is_err() {
            self.log(Level::Error, "failed to send back error message");
        }
    }

    pub fn send_status_msg(&mut self, msg: SentStatus) {
        if self.mode.is_dummy() {
            return;
        }
        // Since it's not always the case that anyone is listening, just try send and ignore any
        // failures.
        self.status_msg.try_send(msg).ok();
    }
}

#[allow(deprecated)]
impl Drop for TaskClient {
    fn drop(&mut self) {
        if !self.mode.should_signal_on_drop() {
            self.log(
                Level::Trace,
                "the task client is getting dropped but instructed to not signal: this is expected during client shutdown",
            );
            return;
        } else {
            self.log(
                Level::Debug,
                "the task client is getting dropped: this is expected during client shutdown",
            );
        }

        if !self.is_shutdown_poll() {
            self.log(Level::Trace, "Notifying stop on unexpected drop");

            // If we can't send, well then there is not much to do
            self.drop_error
                .send(Box::new(TaskError::UnexpectedHalt {
                    shutdown_name: self.name.clone(),
                }))
                .ok();
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
enum ClientOperatingMode {
    // Normal operations
    Listening,
    // Normal operations, but we don't report back if the we stop by getting dropped.
    ListeningButDontReportHalt,
    // Dummy mode, for when we don't do anything at all.
    Dummy,
}

impl ClientOperatingMode {
    fn is_dummy(&self) -> bool {
        self == &ClientOperatingMode::Dummy
    }

    fn should_signal_on_drop(&self) -> bool {
        match self {
            ClientOperatingMode::Listening => true,
            ClientOperatingMode::ListeningButDontReportHalt | ClientOperatingMode::Dummy => false,
        }
    }

    fn set_should_signal_on_drop(&mut self) {
        use ClientOperatingMode::{Dummy, Listening, ListeningButDontReportHalt};
        *self = match &self {
            ListeningButDontReportHalt | Listening => Listening,
            Dummy => Dummy,
        };
    }

    fn set_should_not_signal_on_drop(&mut self) {
        use ClientOperatingMode::{Dummy, Listening, ListeningButDontReportHalt};
        *self = match &self {
            ListeningButDontReportHalt | Listening => ListeningButDontReportHalt,
            Dummy => Dummy,
        };
    }
}

#[deprecated]
#[allow(deprecated)]
#[derive(Debug)]
pub enum TaskHandle {
    /// Full [`TaskManager`] that was created by the underlying task.
    Internal(TaskManager),

    /// `[TaskClient]` that was passed from an external task, that controls the shutdown process.
    External(TaskClient),
}

#[allow(deprecated)]
impl From<TaskManager> for TaskHandle {
    fn from(value: TaskManager) -> Self {
        TaskHandle::Internal(value)
    }
}

#[allow(deprecated)]
impl From<TaskClient> for TaskHandle {
    fn from(value: TaskClient) -> Self {
        TaskHandle::External(value)
    }
}

#[allow(deprecated)]
impl Default for TaskHandle {
    fn default() -> Self {
        TaskHandle::Internal(TaskManager::default())
    }
}

#[allow(deprecated)]
impl TaskHandle {
    #[must_use]
    pub fn name_if_unnamed<S: Into<String>>(self, name: S) -> Self {
        match self {
            TaskHandle::Internal(task_manager) => {
                if task_manager.name.is_none() {
                    TaskHandle::Internal(task_manager.named(name))
                } else {
                    TaskHandle::Internal(task_manager)
                }
            }
            TaskHandle::External(task_client) => {
                if task_client.name.is_none() {
                    TaskHandle::External(task_client.named(name))
                } else {
                    TaskHandle::External(task_client)
                }
            }
        }
    }

    #[must_use]
    pub fn named<S: Into<String>>(self, name: S) -> Self {
        match self {
            TaskHandle::Internal(task_manager) => TaskHandle::Internal(task_manager.named(name)),
            TaskHandle::External(task_client) => TaskHandle::External(task_client.named(name)),
        }
    }

    pub fn fork<S: Into<String>>(&self, child_suffix: S) -> TaskClient {
        match self {
            TaskHandle::External(shutdown) => shutdown.fork(child_suffix),
            TaskHandle::Internal(shutdown) => shutdown.subscribe_named(child_suffix),
        }
    }

    pub fn get_handle(&self) -> TaskClient {
        match self {
            TaskHandle::External(shutdown) => shutdown.clone(),
            TaskHandle::Internal(shutdown) => shutdown.subscribe(),
        }
    }

    pub fn try_into_task_manager(self) -> Option<TaskManager> {
        match self {
            TaskHandle::External(_) => None,
            TaskHandle::Internal(shutdown) => Some(shutdown),
        }
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub async fn wait_for_shutdown(self) -> Result<(), SentError> {
        match self {
            TaskHandle::Internal(mut task_manager) => task_manager.catch_interrupt().await,
            TaskHandle::External(mut task_client) => {
                task_client.recv().await;
                Ok(())
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    #[allow(deprecated)]
    async fn signal_shutdown() {
        let shutdown = TaskManager::default();
        let mut listener = shutdown.subscribe();

        let task = tokio::spawn(async move {
            tokio::select! {
                _ = listener.recv() => 42,
            }
        });

        shutdown.signal_shutdown().unwrap();
        assert_eq!(task.await.unwrap(), 42);
    }
}