paladin/runtime/mod.rs
1//! Utility for managing the routing of distributed tasks and their results.
2//!
3//! This module provides utilities for orchestrating distributed task execution,
4//! focusing on the interplay between operations, their invocations as tasks,
5//! and higher-order directives that manage these tasks.
6//!
7//! It provides two runtimes:
8//! - [`Runtime`]: Used by the orchestrator to manage the execution of tasks.
9//! - [`WorkerRuntime`]: Used by worker nodes to execute tasks.
10//!
11//! # Semantic Overview
12//!
13//! - [`Operation`]: Defines the signature and semantics of a computation. Akin
14//! to a function that maps an input to an output.
15//! - [`Task`]: Represents the invocation of an operation with specific
16//! arguments. The payloads used for coordination.
17//! - [`Directive`](crate::directive::Directive): Manages and orchestrates the
18//! results of multiple tasks, implementing higher-order semantics.
19//!
20//! In the context of our system, we assume:
21//! - [`Operation`]s are expensive as they usually encode hefty computations.
22//! - [`Directive`](crate::directive::Directive)s are cheap since they primarily
23//! manage and coordinate the results of those computations.
24//!
25//! A key concept is the information asymmetry between
26//! [`Directive`](crate::directive::Directive)s and [`Task`]s:
27//! - A [`Directive`](crate::directive::Directive) knows about the [`Task`]s it
28//! manages.
29//! - A [`Task`], however, is oblivious to any
30//! [`Directive`](crate::directive::Directive) overseeing it.
31//!
32//! This asymmetry highlights distinct channeling requirements:
33//! - [`Directive`](crate::directive::Directive)s require a discrete homogenous
34//! stream of results.
35//! - [`Task`] channels must accommodate arbitrary operations, listening on a
36//! single, heterogenous stream of results.
37//!
38//! Practically, this is implemented by using a single, stable channel for
39//! [`Task`]s, and dynamically issuing new channels in context of each
40//! [`Directive`](crate::directive::Directive)'s evaluation. This allows workers
41//! to listen on a single channel and thus execute arbitrary [`Task`]s, while
42//! [`Directive`](crate::directive::Directive)s can listen on an isolated
43//! channel for their results.
44//!
45//! The [`Runtime`] is implements the semantics of this architecture.
46//! Fundamentally, the [`Runtime`] is responsible for managing the channels that
47//! [`Task`]s and [`Directive`](crate::directive::Directive)s use to
48//! communicate, and provides a simple interface for interacting with these
49//! channels.
50
51use std::{
52 sync::Arc,
53 time::{Duration, Instant},
54};
55
56use anyhow::Result;
57use dashmap::{mapref::entry::Entry, DashMap};
58use futures::{stream::BoxStream, Stream, StreamExt};
59use serde::{Deserialize, Serialize};
60use tokio::{select, task::JoinHandle, try_join};
61use tracing::{debug_span, error, instrument, trace, warn, Instrument};
62
63use self::dynamic_channel::{DynamicChannel, DynamicChannelFactory};
64use crate::{
65 acker::{Acker, ComposedAcker},
66 channel::{
67 coordinated_channel::coordinated_channel, Channel, ChannelFactory, ChannelType, LeaseGuard,
68 },
69 common::get_random_routing_key,
70 config::Config,
71 operation::{marker::Marker, FatalStrategy, Operation},
72 queue::{Publisher, PublisherExt},
73 serializer::{Serializable, Serializer},
74 task::{AnyTask, AnyTaskOutput, AnyTaskResult, Task, TaskResult},
75};
76
77type Receiver<'a, Item> = Box<dyn Stream<Item = (Item, Box<dyn Acker>)> + Send + Unpin + 'a>;
78type Sender<'a, Item> = Box<dyn Publisher<Item> + Send + Unpin + Sync + 'a>;
79type CoordinatedTaskChannel<'a, Op, Metadata> = (
80 String,
81 Sender<'a, Task<'a, Op, Metadata>>,
82 LeaseGuard<DynamicChannel, Receiver<'a, TaskResult<Op, Metadata>>>,
83);
84
85/// The core of the distributed task management system.
86///
87/// Fundamentally, the [`Runtime`] is responsible for managing the channels that
88/// [`Task`]s and [`Directive`](crate::directive::Directive)s use to
89/// communicate, and provides a simple interface for interacting with these
90/// channels.
91///
92/// This runtime should be used in an orchestration context, where a single node
93/// is responsible for managing the execution of tasks. This is in contrast to
94/// the [`WorkerRuntime`], which is used by worker nodes that execute tasks.
95///
96/// The primary purpose of this runtime is to facilitate the instantiation of
97/// new coordination channels for dispatching [`Task`]s and receiving their
98/// associated [`TaskResult`]s. It takes care of namespacing new result channels
99/// such that users can consume them in an isolated manner.
100/// [`Directive`](crate::directive::Directive)s will use this to create a siloed
101/// channel upon which they can listen to only the results of the [`Task`]s they
102/// manage. It also takes care of synchronizing disparate sender and receiver
103/// channels such that closing the sender terminates the receiver.
104///
105/// ## Emulation
106/// The main [`Runtime`] provides emulation functionality for the worker. This
107/// can be enabled by passing the
108/// [`config::Runtime::InMemory`](crate::config::Runtime::InMemory) field as
109/// part of the configuration to [`Runtime::from_config`]. This will spin up a
110/// multi-threaded worker emulator that will execute multiple [`WorkerRuntime`]s
111/// concurrently. This can be used to simulate a distributed environment
112/// in-memory, and finds immediate practical use in writing tests.
113///
114/// See the [runtime module documentation](crate::runtime) for more information
115/// on runtime semantics.
116
117pub struct Runtime {
118 channel_factory: DynamicChannelFactory,
119 task_channel: DynamicChannel,
120 serializer: Serializer,
121 worker_emulator: Option<Vec<JoinHandle<Result<()>>>>,
122 _marker: Marker,
123}
124
125const IPC_ROUTING_KEY: &str = "ipc-routing-key";
126pub const DEFAULT_ROUTING_KEY: &str = "default";
127
128impl Runtime {
129 /// Initializes the [`Runtime`] with the provided [`Config`].
130 pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
131 let channel_factory = DynamicChannelFactory::from_config(config).await?;
132 let task_channel = channel_factory
133 .get(
134 config
135 .task_bus_routing_key
136 .clone()
137 .unwrap_or_else(|| DEFAULT_ROUTING_KEY.to_string()),
138 ChannelType::ExactlyOnce,
139 )
140 .await?;
141 let serializer = Serializer::from(config);
142
143 // Spin up an emulator for the worker runtime if we're running in-memory.
144 let worker_emulator = match config.runtime {
145 crate::config::Runtime::InMemory => Some(Self::spawn_emulator(
146 channel_factory.clone(),
147 task_channel.clone(),
148 config.num_workers.unwrap_or(3),
149 )),
150 _ => None,
151 };
152
153 Ok(Self {
154 channel_factory,
155 task_channel,
156 serializer,
157 worker_emulator,
158 _marker: marker,
159 })
160 }
161
162 /// Short-hand for initializing an in-memory [`Runtime`].
163 pub async fn in_memory() -> Result<Self> {
164 let config = Config {
165 runtime: crate::config::Runtime::InMemory,
166 ..Default::default()
167 };
168 Self::from_config(&config, Marker).await
169 }
170
171 /// Spawns an emulator for the worker runtime.
172 ///
173 /// This is used to emulate the worker runtime when running in-memory.
174 fn spawn_emulator(
175 channel_factory: DynamicChannelFactory,
176 task_channel: DynamicChannel,
177 num_threads: usize,
178 ) -> Vec<JoinHandle<Result<()>>> {
179 (0..num_threads)
180 .map(|_| {
181 let channel_factory = channel_factory.clone();
182 let task_channel = task_channel.clone();
183 tokio::spawn(async move {
184 let worker_runtime = WorkerRuntime {
185 channel_factory,
186 task_channel,
187 _marker: Marker,
188 };
189 worker_runtime.main_loop().await?;
190 Ok(())
191 })
192 })
193 .collect()
194 }
195
196 pub async fn close(&self) -> Result<()> {
197 self.task_channel.close().await
198 }
199
200 /// Provides a [`Publisher`] for dispatching [`Task`]s of the specified
201 /// [`Operation`] and its associated `Metadata`.
202 ///
203 /// This generally shouldn't be used by itself if the caller is interested
204 /// in receiving the results associated with dispatched [`Task`]s.
205 /// Instead, consider using
206 /// [`lease_coordinated_task_channel`](Self::lease_coordinated_task_channel),
207 /// which uses this function internally and provides additional
208 /// coordination guarantees.
209 ///
210 /// # Metadata
211 /// `Metadata` is a generic parameter that implements [`Serializable`], used
212 /// to encode additional information into a [`Task`] that is relevant to
213 /// the particular [`Directive`](crate::directive::Directive) orchestrating
214 /// it. This can be useful in cases where a
215 /// [`Directive`](crate::directive::Directive) needs to coordinate the
216 /// results of multiple [`Task`]s.
217 #[instrument(skip_all, level = "debug")]
218 async fn get_task_sender<'a, Op: Operation + 'a, Metadata: Serializable + 'a>(
219 &self,
220 ) -> Result<Sender<'a, Task<'a, Op, Metadata>>> {
221 // Get a publisher for the task channel, which accepts `AnyTask`.
222 let sender = self.task_channel.sender::<AnyTask>().await?;
223 let serializer = self.serializer;
224 // Transform the sender to accept typed `Task`s by serializing them into
225 // `AnyTask`s on behalf of the caller. This allows the caller to pass in
226 // a typed `Task` without having to worry about serialization.
227 let transformed_sender =
228 sender.with(move |task: &Task<'_, Op, Metadata>| task.as_any_task(serializer));
229
230 Ok(Box::new(transformed_sender))
231 }
232
233 /// Leases a new discrete
234 /// [coordinated](crate::channel::coordinated_channel::coordinated_channel)
235 /// channel for asynchronously dispatching [`Task`]s and receiving their
236 /// [`TaskResult`]s.
237 ///
238 /// # Design notes
239 ///
240 /// - The returned receiver is wrapped in a [`LeaseGuard`] that will release
241 /// the leased [`Channel`] once dropped.
242 /// - A unique identifier is returned for the newly leased [`Channel`] that
243 /// is bound to the receiver. This can be used to route [`TaskResult`]s
244 /// back to that channel such that they're propagated to provided
245 /// receiver.
246 /// - Even though the `sender` and `receiver` interact with two distinct
247 /// channels, they are bound by
248 /// [`coordinated_channel`](crate::channel::coordinated_channel::coordinated_channel),
249 /// which ensures that open/closed state and pending message state are
250 /// synchronized between them.
251 /// - The provided receiver automatically deserializes [`AnyTaskResult`]s
252 /// into typed [`TaskResult`]s on behalf of the caller.
253 ///
254 /// Users should take care to close the sender once all tasks have been
255 /// dispatched. This will ensure that the receiver is terminated once
256 /// all results have been received. Without this, the receiver will block
257 /// indefinitely. Alternatively, returning early without closing the
258 /// sender will will be fine too, as the sender will be closed once dropped.
259 ///
260 /// # Example
261 /// ```
262 /// use paladin::{
263 /// RemoteExecute,
264 /// runtime::Runtime,
265 /// task::Task,
266 /// operation::{Operation, Result},
267 /// };
268 /// use serde::{Deserialize, Serialize};
269 /// use futures::StreamExt;
270 ///
271 /// #[derive(Serialize, Deserialize)]
272 /// struct Metadata {
273 /// id: usize,
274 /// }
275 /// # #[derive(Serialize, Deserialize, RemoteExecute)]
276 /// # struct StringLength;
277 /// # impl Operation for StringLength {
278 /// # type Input = String;
279 /// # type Output = usize;
280 /// #
281 /// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
282 /// # Ok(input.len())
283 /// # }
284 /// # }
285 ///
286 /// async fn run_task<Op: Operation>(op: &Op, runtime: &Runtime, input: Op::Input) -> anyhow::Result<()> {
287 /// let (identifier, mut sender, mut receiver) = runtime.lease_coordinated_task_channel::<Op, Metadata>().await?;
288 ///
289 /// // Issue a task with the identifier of the receiver
290 /// sender.publish(&Task {
291 /// routing_key: identifier,
292 /// metadata: Metadata { id: 0 },
293 /// op,
294 /// input,
295 /// })
296 /// .await?;
297 /// sender.close().await?;
298 ///
299 ///
300 /// while let Some((result, acker)) = receiver.next().await {
301 /// // ... handle result
302 /// }
303 ///
304 /// Ok(())
305 /// }
306 /// ```
307 ///
308 /// # Metadata
309 /// `Metadata` is a generic parameter that implements [`Serializable`], used
310 /// to encode additional information into a [`Task`] that is relevant to
311 /// the particular [`Directive`](crate::directive::Directive) orchestrating
312 /// it. This can be useful in cases where a
313 /// [`Directive`](crate::directive::Directive) needs to coordinate the
314 /// results of multiple [`Task`]s.
315 #[instrument(skip_all, level = "debug")]
316 pub async fn lease_coordinated_task_channel<
317 'a,
318 Op: Operation + 'a,
319 Metadata: Serializable + 'a,
320 >(
321 &self,
322 ) -> Result<CoordinatedTaskChannel<'a, Op, Metadata>> {
323 // Issue a new channel and return its identifier paired with a stream of
324 // results.
325 let (task_sender, (result_channel_identifier, result_channel)) = try_join!(
326 self.get_task_sender(),
327 self.channel_factory.issue(ChannelType::ExactlyOnce)
328 )?;
329
330 // Transform the stream to deserialize the results into typed `TaskResult`s on
331 // behalf of the caller. This allows the caller to receive typed
332 // `TaskResult`s without having to worry about deserialization.
333 let receiver = result_channel
334 .receiver::<AnyTaskResult>()
335 .await?
336 .map(move |(result, acker)| (result.into_task_result::<Op, Metadata>(), acker));
337
338 // Enable coordination between the task and result channels.
339 let (sender, receiver) = coordinated_channel(task_sender, receiver);
340
341 // The stream returned by `coordinated_channel` has its own `Acker` that is used
342 // to keep track of the number of successfully processed results. This
343 // is done to ensure the Stream can be properly drained when a Sender is closed.
344 //
345 // We wrap the original receiver's `Acker` in a `ComposedAcker` that will also
346 // ack the coordinated Stream if it is successful. This prevents
347 // out-of-sync issues where the `CoordinatedStream` removes a pending send from
348 // its state, but the original message failed to ack.
349 let ack_composed_receiver =
350 receiver.map(|((result, original_acker), coordinated_acker)| {
351 (
352 result,
353 Box::new(ComposedAcker::new(original_acker, coordinated_acker))
354 as Box<dyn Acker>,
355 )
356 });
357
358 Ok((
359 result_channel_identifier,
360 Box::new(sender),
361 // Wrap the stream in a `LeaseGuard` that will release the channel once dropped.
362 LeaseGuard::new(result_channel, Box::new(ack_composed_receiver)),
363 ))
364 }
365}
366
367/// Drop the worker emulator when the runtime is dropped.
368impl Drop for Runtime {
369 fn drop(&mut self) {
370 if let Some(worker_emulator) = self.worker_emulator.take() {
371 for handle in worker_emulator {
372 handle.abort();
373 }
374 }
375 }
376}
377
378/// A runtime for worker nodes.
379///
380/// This runtime provides functionality for the three responsibilities of a
381/// worker process.
382/// 1. Listen for new [`Task`]s.
383/// 2. Execute those [`Task`]s.
384/// 3. Send back the results of a [`Task`] execution.
385#[derive(Clone)]
386pub struct WorkerRuntime {
387 channel_factory: DynamicChannelFactory,
388 task_channel: DynamicChannel,
389 _marker: Marker,
390}
391
392#[derive(Debug)]
393pub struct ExecutionOk {
394 pub routing_key: String,
395 pub output: AnyTaskOutput,
396}
397
398#[derive(Debug)]
399pub struct ExecutionErr<E> {
400 routing_key: String,
401 err: E,
402 strategy: FatalStrategy,
403}
404
405/// Inter-process messages between workers.
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub enum WorkerIpc {
408 ExecutionError { routing_key: String },
409}
410
411impl WorkerRuntime {
412 /// Initializes the [`WorkerRuntime`] with the provided [`Config`].
413 pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
414 let channel_factory = DynamicChannelFactory::from_config(config).await?;
415 let task_channel = channel_factory
416 .get(
417 config
418 .task_bus_routing_key
419 .clone()
420 .unwrap_or_else(|| DEFAULT_ROUTING_KEY.to_string()),
421 ChannelType::ExactlyOnce,
422 )
423 .await?;
424
425 Ok(Self {
426 channel_factory,
427 task_channel,
428 _marker: marker,
429 })
430 }
431
432 /// Provides a [`Publisher`] for dispatching [`AnyTaskResult`]s.
433 ///
434 /// Typically used by a worker node for send back the results of a [`Task`]
435 /// execution.
436 #[instrument(skip(self), level = "trace")]
437 pub async fn get_result_sender(&self, identifier: String) -> Result<Sender<AnyTaskResult>> {
438 self.channel_factory
439 .get(identifier, ChannelType::ExactlyOnce)
440 .await?
441 .sender::<AnyTaskResult>()
442 .await
443 }
444
445 /// Get a [`Publisher`] for dispatching [`WorkerIpc`] messages.
446 ///
447 /// Typically used by a worker node to notify other workers of a fatal
448 /// error.
449 #[instrument(skip(self), level = "trace")]
450 pub async fn get_ipc_sender(&self) -> Result<Sender<WorkerIpc>> {
451 self.channel_factory
452 .get(IPC_ROUTING_KEY.to_string(), ChannelType::Broadcast)
453 .await?
454 .sender()
455 .await
456 }
457
458 /// Get a [`Stream`] for receiving [`WorkerIpc`] messages.
459 ///
460 /// Typically used by a worker node to listen for fatal errors from other
461 /// workers.
462 #[instrument(skip(self), level = "trace")]
463 pub async fn get_ipc_receiver(&self) -> Result<BoxStream<'static, WorkerIpc>> {
464 let s = self
465 .channel_factory
466 .get(IPC_ROUTING_KEY.to_string(), ChannelType::Broadcast)
467 .await?
468 .receiver::<WorkerIpc>()
469 .await?;
470
471 Ok(s.then(|(message, acker)| async move {
472 // auto-ack
473 _ = acker.ack().await;
474 message
475 })
476 .boxed())
477 }
478
479 /// Provides a [`Stream`] incoming [`AnyTask`]s.
480 ///
481 /// Typically the the worker node's first interaction with the [`Runtime`].
482 /// This is how workers receive [`Task`]s for remote execution.
483 ///
484 /// # Example
485 /// ```no_run
486 /// use clap::Parser;
487 /// use paladin::{
488 /// RemoteExecute,
489 /// config::Config,
490 /// registry,
491 /// runtime::WorkerRuntime,
492 /// operation::{Operation, Result},
493 /// };
494 /// use serde::{Deserialize, Serialize};
495 /// use futures::StreamExt;
496 /// #
497 /// # #[derive(Serialize, Deserialize, RemoteExecute)]
498 /// # struct StringLength;
499 /// #
500 /// # impl Operation for StringLength {
501 /// # type Input = String;
502 /// # type Output = usize;
503 /// #
504 /// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
505 /// # Ok(input.len())
506 /// # }
507 /// # }
508 /// #
509 ///
510 /// #[derive(Parser, Debug)]
511 /// pub struct Cli {
512 /// #[command(flatten)]
513 /// pub options: Config,
514 /// }
515 ///
516 /// paladin::registry!();
517 ///
518 /// #[tokio::main]
519 /// async fn main() -> anyhow::Result<()> {
520 /// let args = Cli::parse();
521 /// let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
522 ///
523 /// let mut task_stream = runtime.get_task_receiver().await?;
524 /// while let Some((task, delivery)) = task_stream.next().await {
525 /// // ... handle task
526 /// }
527 /// # Ok(())
528 /// }
529 /// ```
530 #[instrument(skip_all, level = "trace")]
531 pub async fn get_task_receiver(&self) -> Result<Receiver<AnyTask>> {
532 self.task_channel.receiver().await
533 }
534
535 /// Notify the [`Runtime`] of a fatal error.
536 ///
537 /// This will pass the error back to the consumer of the [`Task`] and notify
538 /// other workers of the error.
539 #[instrument(skip(self), level = "trace")]
540 pub async fn dispatch_fatal<E>(
541 &self,
542 ExecutionErr {
543 routing_key,
544 err,
545 strategy,
546 }: ExecutionErr<E>,
547 ) -> Result<()>
548 where
549 E: std::fmt::Display + std::fmt::Debug,
550 {
551 match strategy {
552 FatalStrategy::Ignore => Ok(()),
553 FatalStrategy::Terminate => {
554 // Notify other workers of the error.
555 let (ipc, sender) = try_join!(
556 self.get_ipc_sender(),
557 self.get_result_sender(routing_key.clone())
558 )?;
559
560 let ipc_msg = WorkerIpc::ExecutionError { routing_key };
561 let sender_msg = AnyTaskResult::Err(err.to_string());
562
563 try_join!(ipc.publish(&ipc_msg), sender.publish(&sender_msg))?;
564 try_join!(ipc.close(), sender.close())?;
565
566 Ok(())
567 }
568 }
569 }
570
571 /// Notify the [`Runtime`] of a successful execution.
572 ///
573 /// This will pass the output back to the consumer of the [`Task`]
574 /// and ack the message.
575 #[instrument(skip(self), level = "trace")]
576 pub async fn dispatch_ok<'a>(
577 &self,
578 ExecutionOk {
579 routing_key,
580 output,
581 }: ExecutionOk,
582 ) -> Result<()> {
583 let sender = self.get_result_sender(routing_key).await?;
584 sender.publish(&AnyTaskResult::Ok(output)).await?;
585 sender.close().await?;
586 Ok(())
587 }
588
589 /// A default worker loop that can be used to process
590 /// [`Task`]s.
591 ///
592 /// Worker implementations generally wont vary, as the their
593 /// primary responsibility is to process incoming tasks. We provide one
594 /// out of the box that will work for most use cases. Users are free to
595 /// implement their own if they need to.
596 ///
597 /// Note that if you define your operations in a separate crate, you'll need
598 /// to use the [`registry!`](crate::registry) macro to register them with
599 /// the runtime.
600 ///
601 /// # Example
602 /// ```no_run
603 /// use paladin::{
604 /// RemoteExecute,
605 /// runtime::WorkerRuntime,
606 /// config::Config,
607 /// task::Task,
608 /// operation::{Result, Operation},
609 /// registry,
610 /// };
611 /// use clap::Parser;
612 /// use serde::{Deserialize, Serialize};
613 /// # #[derive(Serialize, Deserialize, RemoteExecute)]
614 /// # struct StringLength;
615 /// # impl Operation for StringLength {
616 /// # type Input = String;
617 /// # type Output = usize;
618 /// #
619 /// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
620 /// # Ok(input.len())
621 /// # }
622 /// # }
623 /// #
624 ///
625 /// #[derive(Parser, Debug)]
626 /// pub struct Cli {
627 /// #[command(flatten)]
628 /// pub options: Config,
629 /// }
630 ///
631 /// registry!();
632 ///
633 /// #[tokio::main]
634 /// async fn main() -> anyhow::Result<()> {
635 /// let args = Cli::parse();
636 /// let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
637 /// runtime.main_loop().await?;
638 ///
639 /// Ok(())
640 /// }
641 /// ```
642 #[instrument(skip(self), level = "trace")]
643 pub async fn main_loop(&self) -> Result<()> {
644 let mut task_stream = self.get_task_receiver().await?;
645
646 const TERMINATION_CLEAR_INTERVAL: Duration = Duration::from_secs(60);
647 // Keep track of terminated jobs to avoid processing new tasks associated to
648 // them.
649 let terminated_jobs: Arc<DashMap<String, Instant>> = Default::default();
650
651 // Spawn a task that will periodically clear the terminated jobs map.
652 let reaper = tokio::spawn({
653 let terminated_jobs = terminated_jobs.clone();
654 async move {
655 loop {
656 terminated_jobs.retain(|_, v| v.elapsed() < TERMINATION_CLEAR_INTERVAL);
657 tokio::time::sleep(TERMINATION_CLEAR_INTERVAL).await;
658 }
659 }
660 });
661
662 let identifier: String = get_random_routing_key();
663
664 // Create a watch channel for signaling IPC changes while processing a task.
665 let (ipc_sig_term_tx, ipc_sig_term_rx) = tokio::sync::watch::channel::<String>(identifier);
666
667 // Spawn a task that will listen for IPC termination signals and mark jobs as
668 // terminated.
669 let mut ipc_receiver = self.get_ipc_receiver().await?;
670 let remote_ipc_sig_term_handler = tokio::spawn({
671 let terminated_jobs = terminated_jobs.clone();
672
673 async move {
674 while let Some(ipc) = ipc_receiver.next().await {
675 match ipc {
676 WorkerIpc::ExecutionError { routing_key } => {
677 // Mark the job as terminated if it hasn't been already.
678 if mark_terminated(&terminated_jobs, routing_key.clone()) {
679 warn!(routing_key = %routing_key, "received IPC termination signal");
680 // Notify any currently executing tasks of the error.
681 ipc_sig_term_tx.send_replace(routing_key.clone());
682 }
683 }
684 }
685 }
686 }
687 });
688
689 /// Helper to mark a job as terminated.
690 ///
691 /// Returns `true` if the job was marked as terminated, `false`
692 /// otherwise (i.e., it was already marked terminated).
693 #[inline]
694 fn mark_terminated(
695 terminated_jobs: &DashMap<String, Instant>,
696 routing_key: String,
697 ) -> bool {
698 if let Entry::Vacant(entry) = terminated_jobs.entry(routing_key.clone()) {
699 entry.insert(Instant::now());
700 return true;
701 }
702 false
703 }
704
705 while let Some((payload, acker)) = task_stream.next().await {
706 // Skip tasks associated with terminated jobs.
707 if terminated_jobs.contains_key(&payload.clone().routing_key) {
708 trace!(routing_key = %payload.clone().routing_key, "skipping terminated job");
709 acker.nack().await?;
710
711 continue;
712 }
713
714 let routing_key = payload.clone().routing_key;
715 let routing_key_clone = routing_key.clone();
716
717 let span = debug_span!("remote_execute", routing_key = %routing_key_clone);
718 let execution_task = payload.remote_execute().instrument(span);
719
720 // Create a future that will wait for an IPC termination signal.
721 let ipc_sig_term = {
722 let mut ipc_sig_term_rx = ipc_sig_term_rx.clone();
723 async move {
724 loop {
725 ipc_sig_term_rx.changed().await.expect("IPC channel closed");
726 if *ipc_sig_term_rx.borrow() == routing_key_clone {
727 return true;
728 }
729 }
730 }
731 };
732
733 // Wait for either the task to complete or an IPC termination signal.
734 select! {
735 execution = execution_task => {
736 match execution {
737 Ok(output) => {
738 try_join!(
739 acker.ack(),
740 self.dispatch_ok(ExecutionOk {
741 routing_key,
742 output,
743 })
744 )?;
745 }
746 Err(err) => {
747 error!(routing_key = %routing_key, "execution error: {err:?}");
748 mark_terminated(&terminated_jobs, routing_key.clone());
749
750 try_join!(
751 acker.nack(),
752 self.dispatch_fatal(ExecutionErr {
753 routing_key,
754 strategy: err.fatal_strategy(),
755 err,
756 })
757 )?;
758 }
759 }
760 }
761 _ = ipc_sig_term => {
762 warn!(routing_key = %routing_key, "task cancelled via IPC sigterm");
763 _ = acker.nack().await;
764 }
765 }
766 }
767
768 remote_ipc_sig_term_handler.abort();
769 reaper.abort();
770
771 Ok(())
772 }
773}
774
775mod dynamic_channel;