paladin/runtime/
mod.rs

1//! Utility for managing the routing of distributed tasks and their results.
2//!
3//! This module provides utilities for orchestrating distributed task execution,
4//! focusing on the interplay between operations, their invocations as tasks,
5//! and higher-order directives that manage these tasks.
6//!
7//! It provides two runtimes:
8//! - [`Runtime`]: Used by the orchestrator to manage the execution of tasks.
9//! - [`WorkerRuntime`]: Used by worker nodes to execute tasks.
10//!
11//! # Semantic Overview
12//!
13//! - [`Operation`]: Defines the signature and semantics of a computation. Akin
14//!   to a function that maps an input to an output.
15//! - [`Task`]: Represents the invocation of an operation with specific
16//!   arguments. The payloads used for coordination.
17//! - [`Directive`](crate::directive::Directive): Manages and orchestrates the
18//!   results of multiple tasks, implementing higher-order semantics.
19//!
20//! In the context of our system, we assume:
21//! - [`Operation`]s are expensive as they usually encode hefty computations.
22//! - [`Directive`](crate::directive::Directive)s are cheap since they primarily
23//!   manage and coordinate the results of those computations.
24//!
25//! A key concept is the information asymmetry between
26//! [`Directive`](crate::directive::Directive)s and [`Task`]s:
27//! - A [`Directive`](crate::directive::Directive) knows about the [`Task`]s it
28//!   manages.
29//! - A [`Task`], however, is oblivious to any
30//!   [`Directive`](crate::directive::Directive) overseeing it.
31//!
32//! This asymmetry highlights distinct channeling requirements:
33//! - [`Directive`](crate::directive::Directive)s require a discrete homogenous
34//!   stream of results.
35//! - [`Task`] channels must accommodate arbitrary operations, listening on a
36//!   single, heterogenous stream of results.
37//!
38//! Practically, this is implemented by using a single, stable channel for
39//! [`Task`]s, and dynamically issuing new channels in context of each
40//! [`Directive`](crate::directive::Directive)'s evaluation. This allows workers
41//! to listen on a single channel and thus execute arbitrary [`Task`]s, while
42//! [`Directive`](crate::directive::Directive)s can listen on an isolated
43//! channel for their results.
44//!
45//! The [`Runtime`] is implements the semantics of this architecture.
46//! Fundamentally, the [`Runtime`] is responsible for managing the channels that
47//! [`Task`]s and [`Directive`](crate::directive::Directive)s use to
48//! communicate, and provides a simple interface for interacting with these
49//! channels.
50
51use std::{
52    sync::Arc,
53    time::{Duration, Instant},
54};
55
56use anyhow::Result;
57use dashmap::{mapref::entry::Entry, DashMap};
58use futures::{stream::BoxStream, Stream, StreamExt};
59use serde::{Deserialize, Serialize};
60use tokio::{select, task::JoinHandle, try_join};
61use tracing::{debug_span, error, instrument, trace, warn, Instrument};
62
63use self::dynamic_channel::{DynamicChannel, DynamicChannelFactory};
64use crate::{
65    acker::{Acker, ComposedAcker},
66    channel::{
67        coordinated_channel::coordinated_channel, Channel, ChannelFactory, ChannelType, LeaseGuard,
68    },
69    common::get_random_routing_key,
70    config::Config,
71    operation::{marker::Marker, FatalStrategy, Operation},
72    queue::{Publisher, PublisherExt},
73    serializer::{Serializable, Serializer},
74    task::{AnyTask, AnyTaskOutput, AnyTaskResult, Task, TaskResult},
75};
76
77type Receiver<'a, Item> = Box<dyn Stream<Item = (Item, Box<dyn Acker>)> + Send + Unpin + 'a>;
78type Sender<'a, Item> = Box<dyn Publisher<Item> + Send + Unpin + Sync + 'a>;
79type CoordinatedTaskChannel<'a, Op, Metadata> = (
80    String,
81    Sender<'a, Task<'a, Op, Metadata>>,
82    LeaseGuard<DynamicChannel, Receiver<'a, TaskResult<Op, Metadata>>>,
83);
84
85/// The core of the distributed task management system.
86///
87/// Fundamentally, the [`Runtime`] is responsible for managing the channels that
88/// [`Task`]s and [`Directive`](crate::directive::Directive)s use to
89/// communicate, and provides a simple interface for interacting with these
90/// channels.
91///
92/// This runtime should be used in an orchestration context, where a single node
93/// is responsible for managing the execution of tasks. This is in contrast to
94/// the [`WorkerRuntime`], which is used by worker nodes that execute tasks.
95///
96/// The primary purpose of this runtime is to facilitate the instantiation of
97/// new coordination channels for dispatching [`Task`]s and receiving their
98/// associated [`TaskResult`]s. It takes care of namespacing new result channels
99/// such that users can consume them in an isolated manner.
100/// [`Directive`](crate::directive::Directive)s will use this to create a siloed
101/// channel upon which they can listen to only the results of the [`Task`]s they
102/// manage. It also takes care of synchronizing disparate sender and receiver
103/// channels such that closing the sender terminates the receiver.
104///
105/// ## Emulation
106/// The main [`Runtime`] provides emulation functionality for the worker. This
107/// can be enabled by passing the
108/// [`config::Runtime::InMemory`](crate::config::Runtime::InMemory) field as
109/// part of the configuration to [`Runtime::from_config`]. This will spin up a
110/// multi-threaded worker emulator that will execute multiple [`WorkerRuntime`]s
111/// concurrently. This can be used to simulate a distributed environment
112/// in-memory, and finds immediate practical use in writing tests.
113///
114/// See the [runtime module documentation](crate::runtime) for more information
115/// on runtime semantics.
116
117pub struct Runtime {
118    channel_factory: DynamicChannelFactory,
119    task_channel: DynamicChannel,
120    serializer: Serializer,
121    worker_emulator: Option<Vec<JoinHandle<Result<()>>>>,
122    _marker: Marker,
123}
124
125const IPC_ROUTING_KEY: &str = "ipc-routing-key";
126pub const DEFAULT_ROUTING_KEY: &str = "default";
127
128impl Runtime {
129    /// Initializes the [`Runtime`] with the provided [`Config`].
130    pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
131        let channel_factory = DynamicChannelFactory::from_config(config).await?;
132        let task_channel = channel_factory
133            .get(
134                config
135                    .task_bus_routing_key
136                    .clone()
137                    .unwrap_or_else(|| DEFAULT_ROUTING_KEY.to_string()),
138                ChannelType::ExactlyOnce,
139            )
140            .await?;
141        let serializer = Serializer::from(config);
142
143        // Spin up an emulator for the worker runtime if we're running in-memory.
144        let worker_emulator = match config.runtime {
145            crate::config::Runtime::InMemory => Some(Self::spawn_emulator(
146                channel_factory.clone(),
147                task_channel.clone(),
148                config.num_workers.unwrap_or(3),
149            )),
150            _ => None,
151        };
152
153        Ok(Self {
154            channel_factory,
155            task_channel,
156            serializer,
157            worker_emulator,
158            _marker: marker,
159        })
160    }
161
162    /// Short-hand for initializing an in-memory [`Runtime`].
163    pub async fn in_memory() -> Result<Self> {
164        let config = Config {
165            runtime: crate::config::Runtime::InMemory,
166            ..Default::default()
167        };
168        Self::from_config(&config, Marker).await
169    }
170
171    /// Spawns an emulator for the worker runtime.
172    ///
173    /// This is used to emulate the worker runtime when running in-memory.
174    fn spawn_emulator(
175        channel_factory: DynamicChannelFactory,
176        task_channel: DynamicChannel,
177        num_threads: usize,
178    ) -> Vec<JoinHandle<Result<()>>> {
179        (0..num_threads)
180            .map(|_| {
181                let channel_factory = channel_factory.clone();
182                let task_channel = task_channel.clone();
183                tokio::spawn(async move {
184                    let worker_runtime = WorkerRuntime {
185                        channel_factory,
186                        task_channel,
187                        _marker: Marker,
188                    };
189                    worker_runtime.main_loop().await?;
190                    Ok(())
191                })
192            })
193            .collect()
194    }
195
196    pub async fn close(&self) -> Result<()> {
197        self.task_channel.close().await
198    }
199
200    /// Provides a [`Publisher`] for dispatching [`Task`]s of the specified
201    /// [`Operation`] and its associated `Metadata`.
202    ///
203    /// This generally shouldn't be used by itself if the caller is interested
204    /// in receiving the results associated with dispatched [`Task`]s.
205    /// Instead, consider using
206    /// [`lease_coordinated_task_channel`](Self::lease_coordinated_task_channel),
207    /// which uses this function internally and provides additional
208    /// coordination guarantees.
209    ///
210    /// # Metadata
211    /// `Metadata` is a generic parameter that implements [`Serializable`], used
212    /// to encode additional information into a [`Task`] that is relevant to
213    /// the particular [`Directive`](crate::directive::Directive) orchestrating
214    /// it. This can be useful in cases where a
215    /// [`Directive`](crate::directive::Directive) needs to coordinate the
216    /// results of multiple [`Task`]s.
217    #[instrument(skip_all, level = "debug")]
218    async fn get_task_sender<'a, Op: Operation + 'a, Metadata: Serializable + 'a>(
219        &self,
220    ) -> Result<Sender<'a, Task<'a, Op, Metadata>>> {
221        // Get a publisher for the task channel, which accepts `AnyTask`.
222        let sender = self.task_channel.sender::<AnyTask>().await?;
223        let serializer = self.serializer;
224        // Transform the sender to accept typed `Task`s by serializing them into
225        // `AnyTask`s on behalf of the caller. This allows the caller to pass in
226        // a typed `Task` without having to worry about serialization.
227        let transformed_sender =
228            sender.with(move |task: &Task<'_, Op, Metadata>| task.as_any_task(serializer));
229
230        Ok(Box::new(transformed_sender))
231    }
232
233    /// Leases a new discrete
234    /// [coordinated](crate::channel::coordinated_channel::coordinated_channel)
235    /// channel for asynchronously dispatching [`Task`]s and receiving their
236    /// [`TaskResult`]s.
237    ///
238    /// # Design notes
239    ///
240    /// - The returned receiver is wrapped in a [`LeaseGuard`] that will release
241    ///   the leased [`Channel`] once dropped.
242    /// - A unique identifier is returned for the newly leased [`Channel`] that
243    ///   is bound to the receiver. This can be used to route [`TaskResult`]s
244    ///   back to that channel such that they're propagated to provided
245    ///   receiver.
246    /// - Even though the `sender` and `receiver` interact with two distinct
247    ///   channels, they are bound by
248    ///   [`coordinated_channel`](crate::channel::coordinated_channel::coordinated_channel),
249    ///   which ensures that open/closed state and pending message state are
250    ///   synchronized between them.
251    /// - The provided receiver automatically deserializes [`AnyTaskResult`]s
252    ///   into typed [`TaskResult`]s on behalf of the caller.
253    ///
254    /// Users should take care to close the sender once all tasks have been
255    /// dispatched. This will ensure that the receiver is terminated once
256    /// all results have been received. Without this, the receiver will block
257    /// indefinitely. Alternatively, returning early without closing the
258    /// sender will will be fine too, as the sender will be closed once dropped.
259    ///
260    /// # Example
261    /// ```
262    /// use paladin::{
263    ///     RemoteExecute,
264    ///     runtime::Runtime,
265    ///     task::Task,
266    ///     operation::{Operation, Result},
267    /// };
268    /// use serde::{Deserialize, Serialize};
269    /// use futures::StreamExt;
270    ///
271    /// #[derive(Serialize, Deserialize)]
272    /// struct Metadata {
273    ///     id: usize,
274    /// }
275    /// # #[derive(Serialize, Deserialize, RemoteExecute)]
276    /// # struct StringLength;
277    /// # impl Operation for StringLength {
278    /// #    type Input = String;
279    /// #    type Output = usize;
280    /// #
281    /// #    fn execute(&self, input: Self::Input) -> Result<Self::Output> {
282    /// #       Ok(input.len())
283    /// #    }
284    /// # }
285    ///
286    /// async fn run_task<Op: Operation>(op: &Op, runtime: &Runtime, input: Op::Input) -> anyhow::Result<()> {
287    ///     let (identifier, mut sender, mut receiver) = runtime.lease_coordinated_task_channel::<Op, Metadata>().await?;
288    ///
289    ///     // Issue a task with the identifier of the receiver
290    ///     sender.publish(&Task {
291    ///         routing_key: identifier,
292    ///         metadata: Metadata { id: 0 },
293    ///         op,
294    ///         input,
295    ///     })
296    ///     .await?;
297    ///     sender.close().await?;
298    ///
299    ///
300    ///     while let Some((result, acker)) = receiver.next().await {
301    ///             // ... handle result
302    ///     }
303    ///
304    ///     Ok(())
305    /// }
306    /// ```
307    ///
308    /// # Metadata
309    /// `Metadata` is a generic parameter that implements [`Serializable`], used
310    /// to encode additional information into a [`Task`] that is relevant to
311    /// the particular [`Directive`](crate::directive::Directive) orchestrating
312    /// it. This can be useful in cases where a
313    /// [`Directive`](crate::directive::Directive) needs to coordinate the
314    /// results of multiple [`Task`]s.
315    #[instrument(skip_all, level = "debug")]
316    pub async fn lease_coordinated_task_channel<
317        'a,
318        Op: Operation + 'a,
319        Metadata: Serializable + 'a,
320    >(
321        &self,
322    ) -> Result<CoordinatedTaskChannel<'a, Op, Metadata>> {
323        // Issue a new channel and return its identifier paired with a stream of
324        // results.
325        let (task_sender, (result_channel_identifier, result_channel)) = try_join!(
326            self.get_task_sender(),
327            self.channel_factory.issue(ChannelType::ExactlyOnce)
328        )?;
329
330        // Transform the stream to deserialize the results into typed `TaskResult`s on
331        // behalf of the caller. This allows the caller to receive typed
332        // `TaskResult`s without having to worry about deserialization.
333        let receiver = result_channel
334            .receiver::<AnyTaskResult>()
335            .await?
336            .map(move |(result, acker)| (result.into_task_result::<Op, Metadata>(), acker));
337
338        // Enable coordination between the task and result channels.
339        let (sender, receiver) = coordinated_channel(task_sender, receiver);
340
341        // The stream returned by `coordinated_channel` has its own `Acker` that is used
342        // to keep track of the number of successfully processed results. This
343        // is done to ensure the Stream can be properly drained when a Sender is closed.
344        //
345        // We wrap the original receiver's `Acker` in a `ComposedAcker` that will also
346        // ack the coordinated Stream if it is successful. This prevents
347        // out-of-sync issues where the `CoordinatedStream` removes a pending send from
348        // its state, but the original message failed to ack.
349        let ack_composed_receiver =
350            receiver.map(|((result, original_acker), coordinated_acker)| {
351                (
352                    result,
353                    Box::new(ComposedAcker::new(original_acker, coordinated_acker))
354                        as Box<dyn Acker>,
355                )
356            });
357
358        Ok((
359            result_channel_identifier,
360            Box::new(sender),
361            // Wrap the stream in a `LeaseGuard` that will release the channel once dropped.
362            LeaseGuard::new(result_channel, Box::new(ack_composed_receiver)),
363        ))
364    }
365}
366
367/// Drop the worker emulator when the runtime is dropped.
368impl Drop for Runtime {
369    fn drop(&mut self) {
370        if let Some(worker_emulator) = self.worker_emulator.take() {
371            for handle in worker_emulator {
372                handle.abort();
373            }
374        }
375    }
376}
377
378/// A runtime for worker nodes.
379///
380/// This runtime provides functionality for the three responsibilities of a
381/// worker process.
382/// 1. Listen for new [`Task`]s.
383/// 2. Execute those [`Task`]s.
384/// 3. Send back the results of a [`Task`] execution.
385#[derive(Clone)]
386pub struct WorkerRuntime {
387    channel_factory: DynamicChannelFactory,
388    task_channel: DynamicChannel,
389    _marker: Marker,
390}
391
392#[derive(Debug)]
393pub struct ExecutionOk {
394    pub routing_key: String,
395    pub output: AnyTaskOutput,
396}
397
398#[derive(Debug)]
399pub struct ExecutionErr<E> {
400    routing_key: String,
401    err: E,
402    strategy: FatalStrategy,
403}
404
405/// Inter-process messages between workers.
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub enum WorkerIpc {
408    ExecutionError { routing_key: String },
409}
410
411impl WorkerRuntime {
412    /// Initializes the [`WorkerRuntime`] with the provided [`Config`].
413    pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
414        let channel_factory = DynamicChannelFactory::from_config(config).await?;
415        let task_channel = channel_factory
416            .get(
417                config
418                    .task_bus_routing_key
419                    .clone()
420                    .unwrap_or_else(|| DEFAULT_ROUTING_KEY.to_string()),
421                ChannelType::ExactlyOnce,
422            )
423            .await?;
424
425        Ok(Self {
426            channel_factory,
427            task_channel,
428            _marker: marker,
429        })
430    }
431
432    /// Provides a [`Publisher`] for dispatching [`AnyTaskResult`]s.
433    ///
434    /// Typically used by a worker node for send back the results of a [`Task`]
435    /// execution.
436    #[instrument(skip(self), level = "trace")]
437    pub async fn get_result_sender(&self, identifier: String) -> Result<Sender<AnyTaskResult>> {
438        self.channel_factory
439            .get(identifier, ChannelType::ExactlyOnce)
440            .await?
441            .sender::<AnyTaskResult>()
442            .await
443    }
444
445    /// Get a [`Publisher`] for dispatching [`WorkerIpc`] messages.
446    ///
447    /// Typically used by a worker node to notify other workers of a fatal
448    /// error.
449    #[instrument(skip(self), level = "trace")]
450    pub async fn get_ipc_sender(&self) -> Result<Sender<WorkerIpc>> {
451        self.channel_factory
452            .get(IPC_ROUTING_KEY.to_string(), ChannelType::Broadcast)
453            .await?
454            .sender()
455            .await
456    }
457
458    /// Get a [`Stream`] for receiving [`WorkerIpc`] messages.
459    ///
460    /// Typically used by a worker node to listen for fatal errors from other
461    /// workers.
462    #[instrument(skip(self), level = "trace")]
463    pub async fn get_ipc_receiver(&self) -> Result<BoxStream<'static, WorkerIpc>> {
464        let s = self
465            .channel_factory
466            .get(IPC_ROUTING_KEY.to_string(), ChannelType::Broadcast)
467            .await?
468            .receiver::<WorkerIpc>()
469            .await?;
470
471        Ok(s.then(|(message, acker)| async move {
472            // auto-ack
473            _ = acker.ack().await;
474            message
475        })
476        .boxed())
477    }
478
479    /// Provides a [`Stream`] incoming [`AnyTask`]s.
480    ///
481    /// Typically the the worker node's first interaction with the [`Runtime`].
482    /// This is how workers receive [`Task`]s for remote execution.
483    ///
484    /// # Example
485    /// ```no_run
486    /// use clap::Parser;
487    /// use paladin::{
488    ///     RemoteExecute,
489    ///     config::Config,
490    ///     registry,
491    ///     runtime::WorkerRuntime,
492    ///     operation::{Operation, Result},
493    /// };
494    /// use serde::{Deserialize, Serialize};
495    /// use futures::StreamExt;
496    /// #
497    /// # #[derive(Serialize, Deserialize, RemoteExecute)]
498    /// # struct StringLength;
499    /// #
500    /// # impl Operation for StringLength {
501    /// #    type Input = String;
502    /// #    type Output = usize;
503    /// #
504    /// #    fn execute(&self, input: Self::Input) -> Result<Self::Output> {
505    /// #        Ok(input.len())
506    /// #    }
507    /// # }
508    /// #
509    ///
510    /// #[derive(Parser, Debug)]
511    /// pub struct Cli {
512    ///     #[command(flatten)]
513    ///     pub options: Config,
514    /// }
515    ///
516    /// paladin::registry!();
517    ///
518    /// #[tokio::main]
519    /// async fn main() -> anyhow::Result<()> {
520    ///     let args = Cli::parse();
521    ///     let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
522    ///
523    ///     let mut task_stream = runtime.get_task_receiver().await?;
524    ///     while let Some((task, delivery)) = task_stream.next().await {
525    ///         // ... handle task
526    ///     }
527    /// #  Ok(())
528    /// }
529    /// ```
530    #[instrument(skip_all, level = "trace")]
531    pub async fn get_task_receiver(&self) -> Result<Receiver<AnyTask>> {
532        self.task_channel.receiver().await
533    }
534
535    /// Notify the [`Runtime`] of a fatal error.
536    ///
537    /// This will pass the error back to the consumer of the [`Task`] and notify
538    /// other workers of the error.
539    #[instrument(skip(self), level = "trace")]
540    pub async fn dispatch_fatal<E>(
541        &self,
542        ExecutionErr {
543            routing_key,
544            err,
545            strategy,
546        }: ExecutionErr<E>,
547    ) -> Result<()>
548    where
549        E: std::fmt::Display + std::fmt::Debug,
550    {
551        match strategy {
552            FatalStrategy::Ignore => Ok(()),
553            FatalStrategy::Terminate => {
554                // Notify other workers of the error.
555                let (ipc, sender) = try_join!(
556                    self.get_ipc_sender(),
557                    self.get_result_sender(routing_key.clone())
558                )?;
559
560                let ipc_msg = WorkerIpc::ExecutionError { routing_key };
561                let sender_msg = AnyTaskResult::Err(err.to_string());
562
563                try_join!(ipc.publish(&ipc_msg), sender.publish(&sender_msg))?;
564                try_join!(ipc.close(), sender.close())?;
565
566                Ok(())
567            }
568        }
569    }
570
571    /// Notify the [`Runtime`] of a successful execution.
572    ///
573    /// This will pass the output back to the consumer of the [`Task`]
574    /// and ack the message.
575    #[instrument(skip(self), level = "trace")]
576    pub async fn dispatch_ok<'a>(
577        &self,
578        ExecutionOk {
579            routing_key,
580            output,
581        }: ExecutionOk,
582    ) -> Result<()> {
583        let sender = self.get_result_sender(routing_key).await?;
584        sender.publish(&AnyTaskResult::Ok(output)).await?;
585        sender.close().await?;
586        Ok(())
587    }
588
589    /// A default worker loop that can be used to process
590    /// [`Task`]s.
591    ///
592    /// Worker implementations generally wont vary, as the their
593    /// primary responsibility is to process incoming tasks. We provide one
594    /// out of the box that will work for most use cases. Users are free to
595    /// implement their own if they need to.
596    ///
597    /// Note that if you define your operations in a separate crate, you'll need
598    /// to use the [`registry!`](crate::registry) macro to register them with
599    /// the runtime.
600    ///
601    /// # Example
602    /// ```no_run
603    /// use paladin::{
604    ///     RemoteExecute,
605    ///     runtime::WorkerRuntime,
606    ///     config::Config,
607    ///     task::Task,
608    ///     operation::{Result, Operation},
609    ///     registry,
610    /// };
611    /// use clap::Parser;
612    /// use serde::{Deserialize, Serialize};
613    /// # #[derive(Serialize, Deserialize, RemoteExecute)]
614    /// # struct StringLength;
615    /// # impl Operation for StringLength {
616    /// #    type Input = String;
617    /// #    type Output = usize;
618    /// #
619    /// #    fn execute(&self, input: Self::Input) -> Result<Self::Output> {
620    /// #       Ok(input.len())
621    /// #    }
622    /// # }
623    /// #
624    ///
625    /// #[derive(Parser, Debug)]
626    /// pub struct Cli {
627    ///     #[command(flatten)]
628    ///     pub options: Config,
629    /// }
630    ///
631    /// registry!();
632    ///
633    /// #[tokio::main]
634    /// async fn main() -> anyhow::Result<()> {
635    ///     let args = Cli::parse();
636    ///     let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
637    ///     runtime.main_loop().await?;
638    ///
639    ///     Ok(())
640    /// }
641    /// ```
642    #[instrument(skip(self), level = "trace")]
643    pub async fn main_loop(&self) -> Result<()> {
644        let mut task_stream = self.get_task_receiver().await?;
645
646        const TERMINATION_CLEAR_INTERVAL: Duration = Duration::from_secs(60);
647        // Keep track of terminated jobs to avoid processing new tasks associated to
648        // them.
649        let terminated_jobs: Arc<DashMap<String, Instant>> = Default::default();
650
651        // Spawn a task that will periodically clear the terminated jobs map.
652        let reaper = tokio::spawn({
653            let terminated_jobs = terminated_jobs.clone();
654            async move {
655                loop {
656                    terminated_jobs.retain(|_, v| v.elapsed() < TERMINATION_CLEAR_INTERVAL);
657                    tokio::time::sleep(TERMINATION_CLEAR_INTERVAL).await;
658                }
659            }
660        });
661
662        let identifier: String = get_random_routing_key();
663
664        // Create a watch channel for signaling IPC changes while processing a task.
665        let (ipc_sig_term_tx, ipc_sig_term_rx) = tokio::sync::watch::channel::<String>(identifier);
666
667        // Spawn a task that will listen for IPC termination signals and mark jobs as
668        // terminated.
669        let mut ipc_receiver = self.get_ipc_receiver().await?;
670        let remote_ipc_sig_term_handler = tokio::spawn({
671            let terminated_jobs = terminated_jobs.clone();
672
673            async move {
674                while let Some(ipc) = ipc_receiver.next().await {
675                    match ipc {
676                        WorkerIpc::ExecutionError { routing_key } => {
677                            // Mark the job as terminated if it hasn't been already.
678                            if mark_terminated(&terminated_jobs, routing_key.clone()) {
679                                warn!(routing_key = %routing_key, "received IPC termination signal");
680                                // Notify any currently executing tasks of the error.
681                                ipc_sig_term_tx.send_replace(routing_key.clone());
682                            }
683                        }
684                    }
685                }
686            }
687        });
688
689        /// Helper to mark a job as terminated.
690        ///
691        /// Returns `true` if the job was marked as terminated, `false`
692        /// otherwise (i.e., it was already marked terminated).
693        #[inline]
694        fn mark_terminated(
695            terminated_jobs: &DashMap<String, Instant>,
696            routing_key: String,
697        ) -> bool {
698            if let Entry::Vacant(entry) = terminated_jobs.entry(routing_key.clone()) {
699                entry.insert(Instant::now());
700                return true;
701            }
702            false
703        }
704
705        while let Some((payload, acker)) = task_stream.next().await {
706            // Skip tasks associated with terminated jobs.
707            if terminated_jobs.contains_key(&payload.clone().routing_key) {
708                trace!(routing_key = %payload.clone().routing_key, "skipping terminated job");
709                acker.nack().await?;
710
711                continue;
712            }
713
714            let routing_key = payload.clone().routing_key;
715            let routing_key_clone = routing_key.clone();
716
717            let span = debug_span!("remote_execute", routing_key = %routing_key_clone);
718            let execution_task = payload.remote_execute().instrument(span);
719
720            // Create a future that will wait for an IPC termination signal.
721            let ipc_sig_term = {
722                let mut ipc_sig_term_rx = ipc_sig_term_rx.clone();
723                async move {
724                    loop {
725                        ipc_sig_term_rx.changed().await.expect("IPC channel closed");
726                        if *ipc_sig_term_rx.borrow() == routing_key_clone {
727                            return true;
728                        }
729                    }
730                }
731            };
732
733            // Wait for either the task to complete or an IPC termination signal.
734            select! {
735                execution = execution_task => {
736                    match execution {
737                        Ok(output) => {
738                            try_join!(
739                                acker.ack(),
740                                self.dispatch_ok(ExecutionOk {
741                                    routing_key,
742                                    output,
743                                })
744                            )?;
745                        }
746                        Err(err) => {
747                            error!(routing_key = %routing_key, "execution error: {err:?}");
748                            mark_terminated(&terminated_jobs, routing_key.clone());
749
750                            try_join!(
751                                acker.nack(),
752                                self.dispatch_fatal(ExecutionErr {
753                                    routing_key,
754                                    strategy: err.fatal_strategy(),
755                                    err,
756                                })
757                            )?;
758                        }
759                    }
760                }
761                _ = ipc_sig_term => {
762                    warn!(routing_key = %routing_key, "task cancelled via IPC sigterm");
763                    _ = acker.nack().await;
764                }
765            }
766        }
767
768        remote_ipc_sig_term_handler.abort();
769        reaper.abort();
770
771        Ok(())
772    }
773}
774
775mod dynamic_channel;