Skip to main content

rustvello_core/
broker.rs

1use async_trait::async_trait;
2use tokio_util::sync::CancellationToken;
3use tracing::instrument;
4
5use rustvello_proto::identifiers::{InvocationId, TaskId};
6
7use crate::error::RustvelloResult;
8
9/// Message broker interface for routing invocations to runners.
10///
11/// Mirrors pynenc's `BaseBroker`. The broker is a queue that accepts
12/// invocations from the orchestrator and delivers them to runners.
13///
14/// ## Cross-language routing
15///
16/// When a task has a non-empty `language` field in its [`TaskId`],
17/// the broker routes it to a language-specific queue. Workers only
18/// retrieve invocations for their own language via
19/// [`retrieve_invocation_for_language`].
20#[async_trait]
21pub trait Broker: Send + Sync {
22    /// Queue an invocation for processing by a runner.
23    ///
24    /// When the task ID is unknown at the call site this is the fallback.
25    /// Prefer [`route_invocation_for_task`] when the task ID is available —
26    /// backends that support per-task queue isolation (e.g. `MemBroker`) use
27    /// it to enable task-filtered retrieval.
28    async fn route_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
29
30    /// Queue an invocation for processing, with the task ID for per-task routing.
31    ///
32    /// Default implementation calls [`route_invocation`] (global queue).
33    /// Override in backends that support per-task queue isolation.
34    #[instrument(skip(self), fields(%invocation_id))]
35    async fn route_invocation_for_task(
36        &self,
37        invocation_id: &InvocationId,
38        _task_id: &TaskId,
39    ) -> RustvelloResult<()> {
40        self.route_invocation(invocation_id).await
41    }
42
43    /// Queue multiple invocations at once (batch optimization).
44    #[instrument(skip(self, ids), fields(count = ids.len()))]
45    async fn route_invocations(&self, ids: &[InvocationId]) -> RustvelloResult<()> {
46        for id in ids {
47            self.route_invocation(id).await?;
48        }
49        Ok(())
50    }
51
52    /// Retrieve the next invocation to process.
53    /// Returns `None` if the queue is empty.
54    async fn retrieve_invocation(
55        &self,
56        task_id: Option<&TaskId>,
57    ) -> RustvelloResult<Option<InvocationId>>;
58
59    /// Retrieve the next invocation for a specific language worker.
60    ///
61    /// Returns invocations routed to the given language queue.
62    ///
63    /// **Note:** Implementations that check the global queue first (e.g. `MemBroker`)
64    /// may "steal" invocations intended for all workers before language-agnostic
65    /// workers get a chance. In mixed-language deployments, prefer routing
66    /// language-specific tasks to per-task queues via [`route_invocation_for_task`]
67    /// and use [`retrieve_invocation`] with `None` only for language-agnostic work.
68    ///
69    /// Default implementation falls back to [`retrieve_invocation`] with no
70    /// language filtering — backends should override for proper isolation.
71    #[instrument(skip(self))]
72    async fn retrieve_invocation_for_language(
73        &self,
74        _language: &str,
75    ) -> RustvelloResult<Option<InvocationId>> {
76        self.retrieve_invocation(None).await
77    }
78
79    /// Retrieve up to `max` invocations at once (batch optimization).
80    ///
81    /// Default implementation calls [`retrieve_invocation`] in a loop.
82    /// Backends should override for a single lock acquisition.
83    #[instrument(skip(self))]
84    async fn retrieve_invocations(
85        &self,
86        max: usize,
87        task_id: Option<&TaskId>,
88    ) -> RustvelloResult<Vec<InvocationId>> {
89        let capped = max.min(10_000);
90        let mut results = Vec::with_capacity(capped);
91        for _ in 0..capped {
92            match self.retrieve_invocation(task_id).await? {
93                Some(id) => results.push(id),
94                None => break,
95            }
96        }
97        Ok(results)
98    }
99
100    /// Block until work is available or cancellation is requested.
101    ///
102    /// Returns `true` if work may be available, `false` if cancelled.
103    /// Default implementation sleeps for 100ms. Backends with notification
104    /// support (e.g. `MemBroker`) should override with zero-cost waiting.
105    async fn wait_for_work(&self, cancel: &CancellationToken) -> bool {
106        tokio::select! {
107            _ = cancel.cancelled() => false,
108            _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => true,
109        }
110    }
111
112    /// Count queued invocations, optionally filtered by task.
113    async fn count_invocations(&self, task_id: Option<&TaskId>) -> RustvelloResult<usize>;
114
115    /// Remove all queued invocations.
116    async fn purge(&self, task_id: Option<&TaskId>) -> RustvelloResult<()>;
117}