synwire_core/runnables/core.rs
1//! Core trait for all runnable components.
2//!
3//! # Design Decision: `serde_json::Value` as Universal I/O
4//!
5//! `RunnableCore` uses `serde_json::Value` for input/output rather than generic
6//! type parameters (`RunnableCore<I, O>`) because:
7//!
8//! - **Object safety**: traits with generic type parameters cannot be easily
9//! stored in `Vec<Box<dyn RunnableCore>>` for heterogeneous chains.
10//! - **Composability**: any runnable can be chained with any other without
11//! explicit type conversion boilerplate.
12//! - **Trade-off**: runtime type checking instead of compile-time, but this
13//! matches `LangChain` Python's dynamic typing model.
14//! - **Alternative considered**: `Cow<'_, Value>` for zero-copy reads, but the
15//! ergonomic cost was deemed too high for the typical use case.
16
17use crate::error::SynwireError;
18use crate::runnables::config::RunnableConfig;
19use crate::{BoxFuture, BoxStream};
20
21/// Core trait for all runnable components in a chain.
22///
23/// Uses `serde_json::Value` as the universal input/output type
24/// for composability between heterogeneous runnables.
25///
26/// # Default implementations
27///
28/// - [`batch`](RunnableCore::batch) invokes sequentially over each input.
29/// - [`stream`](RunnableCore::stream) wraps [`invoke`](RunnableCore::invoke)
30/// as a single-item stream.
31///
32/// # Lifetime parameter
33///
34/// All async methods share a single lifetime `'a` that ties together
35/// `&'a self` and `Option<&'a RunnableConfig>`, ensuring the returned
36/// future can borrow both.
37///
38/// # Cancel safety
39///
40/// [`invoke`](Self::invoke) and [`batch`](Self::batch) are **not
41/// cancel-safe** in general -- the behaviour depends on the concrete
42/// implementation. Dropping `batch` mid-execution may leave earlier
43/// inputs processed but later inputs unprocessed. The [`BoxStream`]
44/// returned by [`stream`](Self::stream) can be safely dropped at any
45/// point.
46pub trait RunnableCore: Send + Sync {
47 /// Invoke the runnable with a single input.
48 fn invoke<'a>(
49 &'a self,
50 input: serde_json::Value,
51 config: Option<&'a RunnableConfig>,
52 ) -> BoxFuture<'a, Result<serde_json::Value, SynwireError>>;
53
54 /// Invoke on multiple inputs. Default implementation calls
55 /// [`invoke`](RunnableCore::invoke) sequentially for each input.
56 fn batch<'a>(
57 &'a self,
58 inputs: Vec<serde_json::Value>,
59 config: Option<&'a RunnableConfig>,
60 ) -> BoxFuture<'a, Result<Vec<serde_json::Value>, SynwireError>> {
61 Box::pin(async move {
62 let mut results = Vec::with_capacity(inputs.len());
63 for input in inputs {
64 results.push(self.invoke(input, config).await?);
65 }
66 Ok(results)
67 })
68 }
69
70 /// Stream results. Default implementation wraps [`invoke`](RunnableCore::invoke)
71 /// as a single-item stream.
72 fn stream<'a>(
73 &'a self,
74 input: serde_json::Value,
75 config: Option<&'a RunnableConfig>,
76 ) -> BoxFuture<'a, Result<BoxStream<'a, Result<serde_json::Value, SynwireError>>, SynwireError>>
77 {
78 Box::pin(async move {
79 let result = self.invoke(input, config).await;
80 let stream: BoxStream<'a, Result<serde_json::Value, SynwireError>> =
81 Box::pin(futures_util::stream::iter(vec![result]));
82 Ok(stream)
83 })
84 }
85
86 /// Get the runnable's name for debugging.
87 #[allow(clippy::unnecessary_literal_bound)]
88 fn name(&self) -> &str {
89 "RunnableCore"
90 }
91}