yeti_types/queue.rs
1//! Durable work queue — types shared across host components and the
2//! SDK.
3//!
4//! **Layer:** L0 — these types live in yeti-types because the unified
5//! `Context` references `QueuePayload` via the `queue_enqueue`
6//! signature.
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11// ============================================================================
12// QueuePayload — what lives in the yeti-queue `payload` field
13// ============================================================================
14
15/// The work a queue item represents. Persists in `yeti-queue` as msgpack
16/// inside the `payload` column.
17#[derive(Clone, Debug, Serialize, Deserialize)]
18#[serde(tag = "kind", rename_all = "snake_case")]
19pub enum QueuePayload {
20 /// Dispatch a registered `#[queue_fn]` in the owning wasm app.
21 Function {
22 /// Serialized (msgpack) tuple of the user function's arguments after
23 /// the leading `&Context`.
24 args: Vec<u8>,
25 /// `xxh3_64` of the canonicalized function signature. The worker rejects
26 /// the item if the currently-registered function's `sig_hash` differs
27 /// (signature changed since enqueue — fail fast, don't silently drop
28 /// or mis-interpret args).
29 fn_sig_hash: u64,
30 },
31 /// Dispatch a cross-node `fetch!()` call — routes through the existing
32 /// host reqwest bridge with the same `HttpStats` + inflight cap the
33 /// synchronous `fetch!` macro uses.
34 Fetch {
35 /// HTTP method (uppercase).
36 method: String,
37 /// Fully-qualified URL.
38 url: String,
39 /// HTTP headers, verbatim.
40 headers: HashMap<String, String>,
41 /// Request body bytes (may be empty).
42 body: Vec<u8>,
43 },
44}
45
46// ============================================================================
47// QueueFn — registry of callable functions
48// ============================================================================
49
50/// Host-side registry entry for a `#[queue_fn]` discovered across loaded
51/// wasm apps. The worker dispatches by looking up `(app_id, name)` in the
52/// `HashMap<(AppId, String), QueueFn>` that the runtime maintains.
53///
54/// The trampoline is the per-function shim the attribute macro emits: it
55/// deserializes the msgpack args tuple and calls the user function.
56#[derive(Clone)]
57pub struct QueueFn {
58 /// Canonical name (matches the user function identifier).
59 pub name: &'static str,
60 /// Signature hash — `xxh3_64` of the canonicalized signature string.
61 pub sig_hash: u64,
62 /// Trampoline: takes `&Context` + serialized args, returns the user
63 /// function's JSON result (or an error).
64 pub trampoline:
65 fn(ctx: &crate::resource::Context, args: &[u8]) -> crate::error::Result<serde_json::Value>,
66}
67
68impl std::fmt::Debug for QueueFn {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("QueueFn")
71 .field("name", &self.name)
72 .field("sig_hash", &format_args!("{:#018x}", self.sig_hash))
73 .field("trampoline", &"<fn pointer>")
74 .finish()
75 }
76}
77
78// Host-local registry: the `#[queue_fn]` proc-macro's `inventory::submit!`
79// populates this at link time for functions compiled into the host binary
80// (static plugins).
81inventory::collect!(QueueFn);
82
83/// Look up a host-local `QueueFn` by name (ignores `app_id` — static-crate
84/// functions live in the same linker table as the host binary).
85#[must_use]
86pub fn lookup_queue_fn(name: &str) -> Option<&'static QueueFn> {
87 inventory::iter::<QueueFn>().find(|f| f.name == name)
88}
89
90// ============================================================================
91// App-scoped queue-fn entry — owned wrapper around `QueueFn`
92// ============================================================================
93
94/// App-scoped queue-fn entry.
95///
96/// `name` is owned for return-type uniformity; today every entry comes
97/// out of the host's static inventory and `app_id` is always `"host"`.
98#[derive(Clone)]
99pub struct QueueFnEntry {
100 /// Owning application ID (always `"host"` today — queue fns live in
101 /// the host's static inventory; kept on the type so dispatch sites
102 /// don't churn if a per-app surface returns).
103 pub app_id: String,
104 /// Function name (owned for return-type uniformity).
105 pub name: String,
106 /// `xxh3_64` of the canonicalized signature, as produced by `#[queue_fn]`.
107 pub sig_hash: u64,
108 /// Function pointer to the trampoline.
109 pub trampoline: fn(&crate::resource::Context, &[u8]) -> crate::error::Result<serde_json::Value>,
110}
111
112impl std::fmt::Debug for QueueFnEntry {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("QueueFnEntry")
115 .field("app_id", &self.app_id)
116 .field("name", &self.name)
117 .field("sig_hash", &format_args!("{:#018x}", self.sig_hash))
118 .finish_non_exhaustive()
119 }
120}
121
122/// Look up a queue function for a specific app. Wraps the static-
123/// inventory lookup in a [`QueueFnEntry`] for uniform return shape.
124/// `app_id` is unused today (every registered `queue_fn` lives in the
125/// host's static inventory) but kept on the signature so dispatch
126/// sites don't have to change if a per-app surface returns.
127#[must_use]
128pub fn lookup_queue_fn_for_app(_app_id: &str, name: &str) -> Option<QueueFnEntry> {
129 lookup_queue_fn(name).map(|f| QueueFnEntry {
130 app_id: "host".to_owned(),
131 name: f.name.to_owned(),
132 sig_hash: f.sig_hash,
133 trampoline: f.trampoline,
134 })
135}
136
137// ============================================================================
138// Wasm-app queue dispatch hook
139// ============================================================================
140
141/// Process-wide hook for routing queue jobs into wasm components.
142///
143/// yeti-queue's dispatcher (in `crates/runtime/yeti-queue`) checks
144/// the static [`inventory`]-built native registry first via
145/// [`lookup_queue_fn_for_app`]. When that lookup misses AND a wasm
146/// invoker is installed via [`set_wasm_queue_invoker`], the
147/// dispatcher routes through here: the invoker is expected to find
148/// the wasm component for `app_id`, call its `invoke-queue-fn`
149/// export with `(name, payload)`, and return either the
150/// msgpack-encoded result or an error message.
151///
152/// Layered this way (`OnceLock<Arc<dyn ...>>` in yeti-types, impl
153/// installed from yeti-server at startup) to avoid pulling
154/// yeti-wasm-host into yeti-queue's dep tree.
155pub trait WasmQueueInvoker: Send + Sync {
156 /// Invoke a `queue_fn` on a wasm component.
157 ///
158 /// `app_id` identifies the owning app; `name` the function;
159 /// `payload` is the msgpack-encoded arg tuple. The implementor
160 /// looks up the live wasm component, calls its
161 /// `invoke-queue-fn` export, and returns the bytes the
162 /// component handed back (typically a msgpack-encoded
163 /// `serde_json::Value`).
164 ///
165 /// Returns `Err(msg)` if the app isn't a wasm app, the function
166 /// isn't registered, or the wasm invocation traps / errors.
167 fn invoke<'a>(
168 &'a self,
169 app_id: &'a str,
170 name: &'a str,
171 payload: &'a [u8],
172 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'a>>;
173
174 /// `true` if `app_id` is a registered wasm app this invoker
175 /// knows about. Lets the dispatcher distinguish "unknown app"
176 /// (let the static-inventory miss surface as
177 /// "permanent failure") from "wasm app — try `invoke()`."
178 fn knows_app(&self, app_id: &str) -> bool;
179}
180
181/// Process-wide wasm queue invoker — installed once at startup by
182/// yeti-server.
183static WASM_QUEUE_INVOKER: std::sync::OnceLock<std::sync::Arc<dyn WasmQueueInvoker>> =
184 std::sync::OnceLock::new();
185
186/// Install the process-wide wasm queue invoker. Idempotent —
187/// subsequent calls are no-ops (`OnceLock`).
188pub fn set_wasm_queue_invoker(invoker: std::sync::Arc<dyn WasmQueueInvoker>) {
189 let _ = WASM_QUEUE_INVOKER.set(invoker);
190}
191
192/// Get the installed wasm queue invoker, if any. yeti-queue's
193/// dispatcher calls this on every Function-payload job to decide
194/// whether to route through wasm.
195#[must_use]
196pub fn wasm_queue_invoker() -> Option<&'static std::sync::Arc<dyn WasmQueueInvoker>> {
197 WASM_QUEUE_INVOKER.get()
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn lookup_for_app_falls_back_to_static_inventory() {
206 // No `#[queue_fn]` named `never_registered_fn` exists in the
207 // static inventory; the lookup misses and returns None.
208 let app = format!("test-app-missing-{}", std::process::id());
209 assert!(lookup_queue_fn_for_app(&app, "never_registered_fn").is_none());
210 }
211}
212
213// `#[schedule(every = "...")]` types moved to the `yeti-schedule` crate.
214// Queue and schedule are sibling concepts (the schedule registry enqueues
215// queue jobs at fire-time, but no runtime logic is shared), so the type
216// surfaces live in independent foundation crates.
217
218// The plan originally defined a typed `QueueEvent` broadcast channel at
219// 1024 capacity. The shipped worker uses the default table-pubsub
220// channel (`SubscriptionMessage`, capacity 256) — every mutation of
221// `yeti-queue` flows through there automatically, and the worker
222// scans the row rather than decoding an event payload. The typed
223// `QueueEvent` is therefore not needed: keeping it as dead code invites
224// drift. Re-introduce it if a future consumer genuinely needs typed
225// event payloads without reading the table.