Skip to main content

pylon_functions/
protocol.rs

1//! Bidirectional NDJSON protocol between Rust runtime and TypeScript process.
2//!
3//! Messages are newline-delimited JSON objects. Each function invocation gets
4//! a unique `call_id` for multiplexing concurrent calls over a single connection.
5
6use serde::{Deserialize, Serialize};
7
8// ---------------------------------------------------------------------------
9// Rust → TypeScript messages
10// ---------------------------------------------------------------------------
11
12/// Invoke a function on the TypeScript side.
13#[derive(Debug, Clone, Serialize)]
14pub struct CallMessage {
15    #[serde(rename = "type")]
16    pub msg_type: &'static str, // always "call"
17    pub call_id: String,
18    pub fn_name: String,
19    pub fn_type: FnType,
20    pub args: serde_json::Value,
21    pub auth: AuthInfo,
22    /// HTTP request context — present only when the action is invoked via
23    /// a custom HTTP route (`defineRoute` binding). Actions called from
24    /// other actions via `ctx.runAction` or from jobs don't get this.
25    /// Enables Stripe-webhook-style signature verification + access to
26    /// raw headers/body the router would otherwise discard.
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub request: Option<RequestInfo>,
29}
30
31/// HTTP request metadata forwarded to TypeScript actions invoked via
32/// `defineRoute` bindings. All fields are strings so the TS side can use
33/// them directly without re-parsing.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RequestInfo {
36    /// Uppercased method — `POST`, `GET`, etc.
37    pub method: String,
38    /// Full request path (with query string if any).
39    pub path: String,
40    /// Lowercased header names → values. Multi-value headers are joined
41    /// with `, ` per RFC 7230. This trades some fidelity for a map shape
42    /// that's ergonomic to consume from TS.
43    pub headers: std::collections::HashMap<String, String>,
44    /// The exact bytes of the request body, UTF-8-decoded. Webhook
45    /// signature verification (Stripe, GitHub) needs the bytes that were
46    /// signed, so this is NOT the parsed JSON.
47    pub raw_body: String,
48}
49
50impl CallMessage {
51    pub fn new(
52        call_id: String,
53        fn_name: String,
54        fn_type: FnType,
55        args: serde_json::Value,
56        auth: AuthInfo,
57    ) -> Self {
58        Self {
59            msg_type: "call",
60            call_id,
61            fn_name,
62            fn_type,
63            args,
64            auth,
65            request: None,
66        }
67    }
68
69    /// Attach HTTP request metadata (used when the call originated from a
70    /// `defineRoute` HTTP binding rather than a programmatic invocation).
71    pub fn with_request(mut self, request: RequestInfo) -> Self {
72        self.request = Some(request);
73        self
74    }
75}
76
77/// Result of a DB operation, sent back to TypeScript.
78///
79/// `op_id` is echoed from the incoming `DbOpMessage.op_id` when present.
80/// The TS runtime uses it to demux concurrent DB ops inside a single
81/// function call (e.g. `Promise.all([ctx.db.get(a), ctx.db.get(b)])`).
82/// Absent `op_id` keeps legacy TS runtimes compatible.
83#[derive(Debug, Clone, Serialize)]
84pub struct DbResultMessage {
85    #[serde(rename = "type")]
86    pub msg_type: &'static str, // always "result"
87    pub call_id: String,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub op_id: Option<String>,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub data: Option<serde_json::Value>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub error: Option<ErrorInfo>,
94}
95
96impl DbResultMessage {
97    pub fn ok(call_id: String, data: serde_json::Value) -> Self {
98        Self {
99            msg_type: "result",
100            call_id,
101            op_id: None,
102            data: Some(data),
103            error: None,
104        }
105    }
106
107    pub fn ok_with_op(call_id: String, op_id: Option<String>, data: serde_json::Value) -> Self {
108        Self {
109            msg_type: "result",
110            call_id,
111            op_id,
112            data: Some(data),
113            error: None,
114        }
115    }
116
117    pub fn err(call_id: String, code: &str, message: &str) -> Self {
118        Self {
119            msg_type: "result",
120            call_id,
121            op_id: None,
122            data: None,
123            error: Some(ErrorInfo {
124                code: code.to_string(),
125                message: message.to_string(),
126            }),
127        }
128    }
129
130    pub fn err_with_op(call_id: String, op_id: Option<String>, code: &str, message: &str) -> Self {
131        Self {
132            msg_type: "result",
133            call_id,
134            op_id,
135            data: None,
136            error: Some(ErrorInfo {
137                code: code.to_string(),
138                message: message.to_string(),
139            }),
140        }
141    }
142}
143
144// ---------------------------------------------------------------------------
145// TypeScript → Rust messages
146// ---------------------------------------------------------------------------
147
148/// A message from the TypeScript handler back to Rust.
149#[derive(Debug, Clone, Deserialize)]
150#[serde(tag = "type")]
151pub enum TsMessage {
152    /// DB operation request.
153    #[serde(rename = "db")]
154    Db(DbOpMessage),
155
156    /// Stream a chunk to the HTTP client (SSE).
157    #[serde(rename = "stream")]
158    Stream(StreamChunkMessage),
159
160    /// Schedule a function for later execution.
161    #[serde(rename = "schedule")]
162    Schedule(ScheduleMessage),
163
164    /// Cancel a previously scheduled function.
165    #[serde(rename = "cancel_schedule")]
166    CancelSchedule(CancelScheduleMessage),
167
168    /// Call another function (for actions calling queries/mutations).
169    #[serde(rename = "run_fn")]
170    RunFn(RunFnMessage),
171
172    /// Function completed successfully.
173    #[serde(rename = "return")]
174    Return(ReturnMessage),
175
176    /// Function failed with an error.
177    #[serde(rename = "error")]
178    Error(ErrorMessage),
179
180    /// Initial handshake from the runtime: the list of functions it loaded.
181    /// Sent once at startup before any other message.
182    #[serde(rename = "ready")]
183    Ready(ReadyMessage),
184}
185
186/// Handshake payload from the TS runtime.
187#[derive(Debug, Clone, Deserialize)]
188pub struct ReadyMessage {
189    #[serde(default)]
190    pub functions: Vec<crate::registry::FnDef>,
191    #[serde(default)]
192    pub error: Option<String>,
193}
194
195/// A database operation request from TypeScript.
196#[derive(Debug, Clone, Deserialize)]
197pub struct DbOpMessage {
198    pub call_id: String,
199    /// Optional per-RPC id minted by the TS side. When present, the Rust
200    /// reply echoes it back on `DbResultMessage.op_id` so the TS runtime
201    /// can demux concurrent DB ops from a single handler (Promise.all).
202    /// Legacy TS runtimes that don't send op_id keep the old behavior:
203    /// only one in-flight RPC per call_id at a time.
204    #[serde(default)]
205    pub op_id: Option<String>,
206    pub op: DbOp,
207    pub entity: String,
208    #[serde(default)]
209    pub id: Option<String>,
210    #[serde(default)]
211    pub data: Option<serde_json::Value>,
212    #[serde(default)]
213    pub field: Option<String>,
214    #[serde(default)]
215    pub value: Option<String>,
216    #[serde(default)]
217    pub relation: Option<String>,
218    #[serde(default)]
219    pub target_id: Option<String>,
220    /// Cursor pagination — `paginate` op only. Opaque id-after cursor.
221    #[serde(default)]
222    pub after: Option<String>,
223    /// Cursor pagination — `paginate` op only. Requested page size.
224    #[serde(default)]
225    pub limit: Option<u32>,
226}
227
228/// Database operations available to TypeScript functions.
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
230#[serde(rename_all = "snake_case")]
231pub enum DbOp {
232    Get,
233    List,
234    /// Cursor-paginated list. Uses `after` + `limit` on [`DbOpMessage`].
235    /// Response shape: `{ page: Row[], nextCursor: string | null, isDone: bool }`.
236    Paginate,
237    Insert,
238    Update,
239    Delete,
240    Lookup,
241    Query,
242    QueryGraph,
243    Link,
244    Unlink,
245    /// Faceted full-text search. The query body is whatever the entity
246    /// declares in its `search:` config — query string, filters,
247    /// facets, sort, page, pageSize. Carried on `data`.
248    /// Response shape: `{ hits, facetCounts, total, tookMs }`.
249    Search,
250}
251
252/// A stream chunk to forward to the HTTP client as SSE.
253#[derive(Debug, Clone, Deserialize)]
254pub struct StreamChunkMessage {
255    pub call_id: String,
256    pub data: String,
257    /// Optional event type for SSE (defaults to "message").
258    #[serde(default)]
259    pub event: Option<String>,
260}
261
262/// Schedule a function for future execution.
263#[derive(Debug, Clone, Deserialize)]
264pub struct ScheduleMessage {
265    pub call_id: String,
266    pub fn_name: String,
267    pub args: serde_json::Value,
268    /// Run after this many milliseconds.
269    #[serde(default)]
270    pub delay_ms: Option<u64>,
271    /// Run at this Unix timestamp (ms since epoch).
272    #[serde(default)]
273    pub run_at: Option<u64>,
274}
275
276/// Cancel a scheduled function.
277#[derive(Debug, Clone, Deserialize)]
278pub struct CancelScheduleMessage {
279    pub call_id: String,
280    pub schedule_id: String,
281}
282
283/// Call another function from within an action.
284#[derive(Debug, Clone, Deserialize)]
285pub struct RunFnMessage {
286    pub call_id: String,
287    pub fn_name: String,
288    pub fn_type: FnType,
289    pub args: serde_json::Value,
290}
291
292/// Function returned successfully.
293#[derive(Debug, Clone, Deserialize)]
294pub struct ReturnMessage {
295    pub call_id: String,
296    pub value: serde_json::Value,
297}
298
299/// Function failed.
300#[derive(Debug, Clone, Deserialize)]
301pub struct ErrorMessage {
302    pub call_id: String,
303    pub code: String,
304    pub message: String,
305}
306
307// ---------------------------------------------------------------------------
308// Shared types
309// ---------------------------------------------------------------------------
310
311/// Function type.
312#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
313#[serde(rename_all = "snake_case")]
314pub enum FnType {
315    Query,
316    Mutation,
317    Action,
318}
319
320/// Auth context passed to function handlers.
321///
322/// Mirrors the three fields of the runtime's `AuthContext` that a
323/// mutation can legitimately read: the authenticated user id, admin
324/// flag, and active tenant. Functions that gate on `ctx.auth.tenantId`
325/// (anything org-scoped in a B2B app) need the last one forwarded — it's
326/// easy to forget and catches out every new multi-tenant example until
327/// someone hits "why is my session_tenant always null inside functions".
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct AuthInfo {
330    #[serde(skip_serializing_if = "Option::is_none")]
331    pub user_id: Option<String>,
332    pub is_admin: bool,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub tenant_id: Option<String>,
335}
336
337/// Error info in protocol messages.
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct ErrorInfo {
340    pub code: String,
341    pub message: String,
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn call_message_serializes() {
350        let msg = CallMessage::new(
351            "c1".into(),
352            "placeBid".into(),
353            FnType::Mutation,
354            serde_json::json!({"lotId": "lot_1", "amount": 100}),
355            AuthInfo {
356                user_id: Some("user_1".into()),
357                is_admin: false,
358                tenant_id: None,
359            },
360        );
361        let json = serde_json::to_string(&msg).unwrap();
362        assert!(json.contains("\"type\":\"call\""));
363        assert!(json.contains("\"fn_type\":\"mutation\""));
364    }
365
366    #[test]
367    fn ts_message_deserializes_db_op() {
368        let json = r#"{"type":"db","call_id":"c1","op":"get","entity":"Lot","id":"lot_1"}"#;
369        let msg: TsMessage = serde_json::from_str(json).unwrap();
370        match msg {
371            TsMessage::Db(db) => {
372                assert_eq!(db.call_id, "c1");
373                assert_eq!(db.op, DbOp::Get);
374                assert_eq!(db.entity, "Lot");
375                assert_eq!(db.id.as_deref(), Some("lot_1"));
376            }
377            _ => panic!("expected Db message"),
378        }
379    }
380
381    #[test]
382    fn ts_message_deserializes_stream() {
383        let json = r#"{"type":"stream","call_id":"c1","data":"hello"}"#;
384        let msg: TsMessage = serde_json::from_str(json).unwrap();
385        match msg {
386            TsMessage::Stream(s) => {
387                assert_eq!(s.data, "hello");
388                assert!(s.event.is_none());
389            }
390            _ => panic!("expected Stream message"),
391        }
392    }
393
394    #[test]
395    fn ts_message_deserializes_return() {
396        let json = r#"{"type":"return","call_id":"c1","value":{"ok":true}}"#;
397        let msg: TsMessage = serde_json::from_str(json).unwrap();
398        match msg {
399            TsMessage::Return(r) => {
400                assert_eq!(r.value, serde_json::json!({"ok": true}));
401            }
402            _ => panic!("expected Return message"),
403        }
404    }
405
406    #[test]
407    fn ts_message_deserializes_schedule() {
408        let json = r#"{"type":"schedule","call_id":"c1","fn_name":"closeLot","args":{"lotId":"x"},"delay_ms":5000}"#;
409        let msg: TsMessage = serde_json::from_str(json).unwrap();
410        match msg {
411            TsMessage::Schedule(s) => {
412                assert_eq!(s.fn_name, "closeLot");
413                assert_eq!(s.delay_ms, Some(5000));
414            }
415            _ => panic!("expected Schedule message"),
416        }
417    }
418
419    #[test]
420    fn db_result_ok() {
421        let msg = DbResultMessage::ok("c1".into(), serde_json::json!({"id": "x"}));
422        let json = serde_json::to_string(&msg).unwrap();
423        assert!(json.contains("\"type\":\"result\""));
424        assert!(!json.contains("\"error\""));
425    }
426
427    #[test]
428    fn db_result_err() {
429        let msg = DbResultMessage::err("c1".into(), "NOT_FOUND", "not found");
430        let json = serde_json::to_string(&msg).unwrap();
431        assert!(json.contains("\"error\""));
432        assert!(!json.contains("\"data\""));
433    }
434}