caravan_rpc/lib.rs
1//! Runtime SDK for the [Caravan](https://github.com/paulxiep/caravan)
2//! application-definition compiler.
3//!
4//! A user declares a seam-interface trait, marks it with [`wagon`], registers a
5//! concrete implementation via [`provide`], and dispatches through [`client`].
6//! Dispatch mode (inproc / http / lambda) is read from the
7//! `CARAVAN_RPC_PEERS` env var at the call site; when the env var is unset,
8//! `client::<dyn I>()` returns the registered `Arc<dyn I>` directly with no
9//! overhead (no-config inertness).
10//!
11//! See <https://github.com/paulxiep/caravan/blob/main/docs/poc_rpc_sdk.md> for
12//! the wire contract and per-language surface.
13//!
14//! # M2 status
15//!
16//! 0.1.0 ships the runtime building blocks: codec (`codec`), peer-table
17//! parsing (`peers`), error types (`errors`), HTTP client dispatchers
18//! (`dispatch`, behind the `client` feature). The proc-macro that turns
19//! `#[wagon]` into server + client adapters lands in M2 Session 3+. Until
20//! then, `client::<dyn T>()` returns the inproc-registered impl regardless of
21//! peer-table mode — switching to HTTP happens only after the proc-macro
22//! wires per-trait HTTP adapter discovery.
23//!
24//! Lambda mode panics with an M7 pointer (forward-compat marker).
25//!
26//! ```ignore
27//! use std::sync::Arc;
28//! use caravan_rpc::{wagon, provide, client};
29//!
30//! #[wagon]
31//! pub trait Embedder: Send + Sync {
32//! fn embed(&self, text: &str) -> Vec<f32>;
33//! }
34//!
35//! struct InMemoryEmbedder;
36//! impl Embedder for InMemoryEmbedder {
37//! fn embed(&self, _text: &str) -> Vec<f32> { vec![0.0; 8] }
38//! }
39//!
40//! // startup
41//! provide::<dyn Embedder>(Arc::new(InMemoryEmbedder));
42//!
43//! // call site
44//! let v = client::<dyn Embedder>().embed("hello");
45//! assert_eq!(v.len(), 8);
46//! ```
47
48#![forbid(unsafe_code)]
49
50use std::any::{Any, TypeId};
51use std::collections::HashMap;
52use std::sync::{Arc, OnceLock, RwLock};
53
54pub use caravan_rpc_macros::wagon;
55
56pub mod codec;
57pub mod errors;
58pub mod peers;
59
60#[cfg(feature = "client")]
61pub mod dispatch;
62
63#[cfg(feature = "server")]
64pub mod server;
65
66pub use errors::{RpcError, RpcRemoteError, RpcTransportError};
67pub use peers::{PeerEntry, peer_for};
68
69/// Internal re-exports for use by `#[wagon]`-generated code only. Lets the
70/// user's crate depend solely on `caravan-rpc`; the macro reaches in here
71/// rather than spelling `::serde_json::...` / `::axum::...` (which would
72/// require the user to add those crates to their own Cargo.toml).
73///
74/// Not a stable public API — names may change without notice.
75#[doc(hidden)]
76pub mod __macro_support {
77 pub use async_trait;
78 #[cfg(feature = "server")]
79 pub use axum;
80 pub use inventory;
81 pub use serde_json;
82}
83
84/// Factory entry for a `#[wagon]` trait's HTTP client adapter.
85///
86/// Macro-generated code submits one of these per full-codegen trait via
87/// `inventory::submit!` so `client::<dyn T>()` can discover the
88/// trait-specific HttpClient constructor at runtime, indexed by `TypeId`.
89///
90/// `construct` returns the HttpClient wrapped as `Arc<dyn T>` then erased
91/// into `Box<dyn Any + Send + Sync>` (because `Arc<dyn T>: Any` for
92/// `T: 'static`). The SDK downcasts back to `Arc<T>` in `client::<T>()`.
93pub struct HttpAdapterFactory {
94 pub interface_name: &'static str,
95 pub type_id_fn: fn() -> std::any::TypeId,
96 pub construct: fn(url: String) -> Box<dyn Any + Send + Sync>,
97}
98
99inventory::collect!(HttpAdapterFactory);
100
101fn lookup_http_factory<T: ?Sized + 'static>() -> Option<&'static HttpAdapterFactory> {
102 let want = TypeId::of::<T>();
103 inventory::iter::<HttpAdapterFactory>
104 .into_iter()
105 .find(|f| (f.type_id_fn)() == want)
106}
107
108/// Factory entry for a `#[wagon]` trait's server-side router. Mirrors
109/// [`HttpAdapterFactory`] but for the server direction.
110///
111/// Macro-generated code submits one of these per full-codegen trait via
112/// `inventory::submit!`. [`run_or_serve`] iterates this collection by
113/// interface name to find the right router builder when starting in
114/// peer mode.
115///
116/// `build_router_from_registry` is macro-emitted and does the
117/// trait-erased work of: `try_client::<dyn Trait>()` for the impl,
118/// then `build_<trait>_router(impl)` to produce the axum router.
119#[cfg(feature = "server")]
120pub struct HttpServerFactory {
121 pub interface_name: &'static str,
122 pub build_router_from_registry:
123 fn() -> Result<crate::__macro_support::axum::Router, &'static str>,
124}
125
126#[cfg(feature = "server")]
127inventory::collect!(HttpServerFactory);
128
129/// Run the user's main, OR start a peer HTTP server, based on the
130/// `CARAVAN_RPC_ROLE` env var.
131///
132/// **Inertness**: when the env var is unset or empty, this just awaits
133/// `user_main` and returns — no overhead, no behavior change.
134///
135/// **Peer mode**: when `CARAVAN_RPC_ROLE=peer-<InterfaceName>`,
136/// `user_main` is NOT called. Instead, the SDK:
137/// 1. Looks up the macro-emitted [`HttpServerFactory`] for the named
138/// interface (via inventory).
139/// 2. Calls `build_router_from_registry` which (a) finds the
140/// `provide()`-registered impl in the inproc registry and (b)
141/// builds the axum router using the macro-generated
142/// `build_<trait>_router(impl)`.
143/// 3. Binds on `CARAVAN_RPC_BIND_ADDR` (default `0.0.0.0:8080`) and
144/// `serve_forever`s.
145///
146/// Caller contract: the user's setup code (including `provide()` calls
147/// for all #[wagon] traits) must run BEFORE `run_or_serve` is awaited.
148/// Typical pattern:
149///
150/// ```ignore
151/// #[tokio::main]
152/// async fn main() -> Result<()> {
153/// let state = AppState::from_config(...).await?; // calls provide() inside
154/// caravan_rpc::run_or_serve(|| async move {
155/// // user's normal app startup — only runs in non-peer mode.
156/// run_chat_server(state).await
157/// }).await
158/// }
159/// ```
160#[cfg(feature = "server")]
161pub async fn run_or_serve<F, Fut>(user_main: F) -> Result<(), RpcError>
162where
163 F: FnOnce() -> Fut,
164 Fut: std::future::Future<Output = Result<(), RpcError>>,
165{
166 let role = std::env::var("CARAVAN_RPC_ROLE").unwrap_or_default();
167 if let Some(iface_name) = role.strip_prefix("peer-") {
168 let factory = inventory::iter::<HttpServerFactory>
169 .into_iter()
170 .find(|f| f.interface_name == iface_name)
171 .unwrap_or_else(|| {
172 panic!(
173 "caravan-rpc: CARAVAN_RPC_ROLE={role:?} but no HttpServerFactory \
174 registered for interface {iface_name:?}. Did you mark the trait \
175 with #[wagon] and have your impl crate compiled into this binary?"
176 )
177 });
178 let router = (factory.build_router_from_registry)().unwrap_or_else(|msg| {
179 panic!("caravan-rpc: peer {iface_name} failed to build router: {msg}")
180 });
181 let addr: std::net::SocketAddr = std::env::var("CARAVAN_RPC_BIND_ADDR")
182 .unwrap_or_else(|_| "0.0.0.0:8080".to_string())
183 .parse()
184 .expect("CARAVAN_RPC_BIND_ADDR must parse as SocketAddr");
185 eprintln!("caravan peer {iface_name} serving on {addr}");
186 server::serve_forever(addr, router)
187 .await
188 .expect("serve_forever returned error");
189 Ok(())
190 } else {
191 user_main().await
192 }
193}
194
195/// Version of this crate.
196pub const VERSION: &str = env!("CARGO_PKG_VERSION");
197
198/// Process-global inproc registry mapping a seam trait's [`TypeId`] to its
199/// `Arc<dyn T>` impl.
200///
201/// Stored as `Box<dyn Any + Send + Sync>` so we can key by any trait object's
202/// `TypeId`. The stored value is always an `Arc<T>` (with `T: ?Sized`); the
203/// downcast in [`client`] reconstructs that exact type.
204type Registry = RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>;
205
206fn registry() -> &'static Registry {
207 static R: OnceLock<Registry> = OnceLock::new();
208 R.get_or_init(|| RwLock::new(HashMap::new()))
209}
210
211/// Register `impl_` as the inproc provider for trait object `T`.
212///
213/// Call once per process at startup (worker entry, CLI `main()`) before any
214/// `client::<dyn T>()` call. Re-registering an interface replaces the prior
215/// impl (last-write-wins) — intentional for test isolation; production code
216/// should call `provide` once per interface.
217///
218/// ```ignore
219/// provide::<dyn Embedder>(Arc::new(FastEmbedImpl::new()?));
220/// ```
221pub fn provide<T: ?Sized + Send + Sync + 'static>(impl_: Arc<T>) {
222 let mut g = registry().write().expect("caravan-rpc registry poisoned");
223 g.insert(
224 TypeId::of::<T>(),
225 Box::new(impl_) as Box<dyn Any + Send + Sync>,
226 );
227}
228
229/// Return an `Arc<dyn T>` to dispatch through.
230///
231/// Behavior depends on `CARAVAN_RPC_PEERS[interface]`:
232/// * Unset or `inproc` → the locally `provide()`-ed impl (zero-overhead).
233/// * `http` AND the trait was full-codegen-expanded by `#[wagon]` (so an
234/// inventory factory exists) → an `Arc<<Trait>HttpClient>` whose every
235/// method call goes over the wire.
236/// * `http` but no inventory factory (e.g., `#[wagon(identity)]` trait) →
237/// falls back to the local impl. Logged once at startup so misconfigs
238/// are visible. Documented limitation: identity-marked traits don't
239/// honor mode flips.
240/// * `lambda` → panic with M7 pointer.
241///
242/// Panics if no impl is registered AND no http factory exists for `T`.
243/// Use [`try_client`] for optional seams.
244pub fn client<T: ?Sized + Send + Sync + 'static>() -> Arc<T> {
245 try_client::<T>().unwrap_or_else(|| {
246 panic!(
247 "no impl registered for type {}; call provide::<{}>(Arc::new(impl)) at startup",
248 std::any::type_name::<T>(),
249 std::any::type_name::<T>()
250 )
251 })
252}
253
254/// Return an `Arc<dyn T>` to dispatch through, or `None` if no impl is
255/// available (neither locally `provide()`-ed nor wired via HTTP through
256/// `#[wagon]`'s inventory factory).
257///
258/// Use this for seams that are conditionally enabled at runtime (e.g. an
259/// optional reranker). For seams that must always be present, prefer the
260/// panicking [`client`] for a clearer error message at startup.
261///
262/// Dispatch-mode selection mirrors [`client`]: HTTP mode + an inventory
263/// factory → returns the macro-generated `<Trait>HttpClient`; otherwise
264/// → returns the registered local impl (inproc).
265pub fn try_client<T: ?Sized + Send + Sync + 'static>() -> Option<Arc<T>> {
266 // 1. If an HTTP factory exists for T (i.e., the trait was full-
267 // codegen-expanded by `#[wagon]`), consult the peer table.
268 if let Some(factory) = lookup_http_factory::<T>() {
269 match peer_for(factory.interface_name) {
270 Some(PeerEntry::Http { url }) => {
271 let boxed = (factory.construct)(url);
272 return Some(
273 *boxed
274 .downcast::<Arc<T>>()
275 .expect("caravan-rpc: HttpAdapterFactory.construct returned wrong type"),
276 );
277 }
278 Some(PeerEntry::Lambda { .. }) => {
279 panic!(
280 "caravan-rpc {VERSION}: Lambda dispatch for interface {:?} lands at M7 \
281 (see caravan/docs/development_plan.md).",
282 factory.interface_name
283 );
284 }
285 // Inproc or absent → fall through to local registry lookup.
286 _ => {}
287 }
288 }
289
290 // 2. Local registry lookup. Covers inproc mode + identity-marked
291 // traits + http mode without factory + lambda without factory
292 // (the last two are correctness-acceptable per `client` docs).
293 let g = registry().read().expect("caravan-rpc registry poisoned");
294 g.get(&TypeId::of::<T>()).map(|entry| {
295 entry
296 .downcast_ref::<Arc<T>>()
297 .expect("caravan-rpc registry type mismatch (internal bug)")
298 .clone()
299 })
300}
301
302/// Whether an impl has been registered for trait object `T`.
303///
304/// Slightly cheaper than [`try_client`] when the caller doesn't need the impl
305/// itself (e.g. health checks). Subject to TOCTOU — prefer `try_client` in
306/// dispatch paths.
307pub fn is_provided<T: ?Sized + Send + Sync + 'static>() -> bool {
308 let g = registry().read().expect("caravan-rpc registry poisoned");
309 g.contains_key(&TypeId::of::<T>())
310}
311
312/// Reset the registry. Intended for test isolation only; production code
313/// should `provide()` once and leave the registry alone for the process
314/// lifetime.
315#[doc(hidden)]
316pub fn __clear_registry_for_tests() {
317 let mut g = registry().write().expect("caravan-rpc registry poisoned");
318 g.clear();
319}