rustvello_core/broker.rs
1use async_trait::async_trait;
2use tokio_util::sync::CancellationToken;
3use tracing::instrument;
4
5use rustvello_proto::identifiers::{InvocationId, TaskId};
6
7use crate::error::RustvelloResult;
8
9/// Message broker interface for routing invocations to runners.
10///
11/// Mirrors pynenc's `BaseBroker`. The broker is a queue that accepts
12/// invocations from the orchestrator and delivers them to runners.
13///
14/// ## Cross-language routing
15///
16/// When a task has a non-empty `language` field in its [`TaskId`],
17/// the broker routes it to a language-specific queue. Workers only
18/// retrieve invocations for their own language via
19/// [`retrieve_invocation_for_language`].
20#[async_trait]
21pub trait Broker: Send + Sync {
22 /// Queue an invocation for processing by a runner.
23 ///
24 /// When the task ID is unknown at the call site this is the fallback.
25 /// Prefer [`route_invocation_for_task`] when the task ID is available —
26 /// backends that support per-task queue isolation (e.g. `MemBroker`) use
27 /// it to enable task-filtered retrieval.
28 async fn route_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
29
30 /// Queue an invocation for processing, with the task ID for per-task routing.
31 ///
32 /// Default implementation calls [`route_invocation`] (global queue).
33 /// Override in backends that support per-task queue isolation.
34 #[instrument(skip(self), fields(%invocation_id))]
35 async fn route_invocation_for_task(
36 &self,
37 invocation_id: &InvocationId,
38 _task_id: &TaskId,
39 ) -> RustvelloResult<()> {
40 self.route_invocation(invocation_id).await
41 }
42
43 /// Queue multiple invocations at once (batch optimization).
44 #[instrument(skip(self, ids), fields(count = ids.len()))]
45 async fn route_invocations(&self, ids: &[InvocationId]) -> RustvelloResult<()> {
46 for id in ids {
47 self.route_invocation(id).await?;
48 }
49 Ok(())
50 }
51
52 /// Retrieve the next invocation to process.
53 /// Returns `None` if the queue is empty.
54 async fn retrieve_invocation(
55 &self,
56 task_id: Option<&TaskId>,
57 ) -> RustvelloResult<Option<InvocationId>>;
58
59 /// Retrieve the next invocation for a specific language worker.
60 ///
61 /// Returns invocations routed to the given language queue.
62 ///
63 /// **Note:** Implementations that check the global queue first (e.g. `MemBroker`)
64 /// may "steal" invocations intended for all workers before language-agnostic
65 /// workers get a chance. In mixed-language deployments, prefer routing
66 /// language-specific tasks to per-task queues via [`route_invocation_for_task`]
67 /// and use [`retrieve_invocation`] with `None` only for language-agnostic work.
68 ///
69 /// Default implementation falls back to [`retrieve_invocation`] with no
70 /// language filtering — backends should override for proper isolation.
71 #[instrument(skip(self))]
72 async fn retrieve_invocation_for_language(
73 &self,
74 _language: &str,
75 ) -> RustvelloResult<Option<InvocationId>> {
76 self.retrieve_invocation(None).await
77 }
78
79 /// Retrieve up to `max` invocations at once (batch optimization).
80 ///
81 /// Default implementation calls [`retrieve_invocation`] in a loop.
82 /// Backends should override for a single lock acquisition.
83 #[instrument(skip(self))]
84 async fn retrieve_invocations(
85 &self,
86 max: usize,
87 task_id: Option<&TaskId>,
88 ) -> RustvelloResult<Vec<InvocationId>> {
89 let capped = max.min(10_000);
90 let mut results = Vec::with_capacity(capped);
91 for _ in 0..capped {
92 match self.retrieve_invocation(task_id).await? {
93 Some(id) => results.push(id),
94 None => break,
95 }
96 }
97 Ok(results)
98 }
99
100 /// Block until work is available or cancellation is requested.
101 ///
102 /// Returns `true` if work may be available, `false` if cancelled.
103 /// Default implementation sleeps for 100ms. Backends with notification
104 /// support (e.g. `MemBroker`) should override with zero-cost waiting.
105 async fn wait_for_work(&self, cancel: &CancellationToken) -> bool {
106 tokio::select! {
107 _ = cancel.cancelled() => false,
108 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => true,
109 }
110 }
111
112 /// Count queued invocations, optionally filtered by task.
113 async fn count_invocations(&self, task_id: Option<&TaskId>) -> RustvelloResult<usize>;
114
115 /// Remove all queued invocations.
116 async fn purge(&self, task_id: Option<&TaskId>) -> RustvelloResult<()>;
117}