Skip to main content

pylon_functions/
protocol.rs

1//! Bidirectional NDJSON protocol between Rust runtime and TypeScript process.
2//!
3//! Messages are newline-delimited JSON objects. Each function invocation gets
4//! a unique `call_id` for multiplexing concurrent calls over a single connection.
5
6use serde::{Deserialize, Serialize};
7
8// ---------------------------------------------------------------------------
9// Rust → TypeScript messages
10// ---------------------------------------------------------------------------
11
12/// Invoke a function on the TypeScript side.
13#[derive(Debug, Clone, Serialize)]
14pub struct CallMessage {
15    #[serde(rename = "type")]
16    pub msg_type: &'static str, // always "call"
17    pub call_id: String,
18    pub fn_name: String,
19    pub fn_type: FnType,
20    pub args: serde_json::Value,
21    pub auth: AuthInfo,
22    /// HTTP request context — present only when the action is invoked via
23    /// a custom HTTP route (`defineRoute` binding). Actions called from
24    /// other actions via `ctx.runAction` or from jobs don't get this.
25    /// Enables Stripe-webhook-style signature verification + access to
26    /// raw headers/body the router would otherwise discard.
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub request: Option<RequestInfo>,
29}
30
31/// HTTP request metadata forwarded to TypeScript actions invoked via
32/// `defineRoute` bindings. All fields are strings so the TS side can use
33/// them directly without re-parsing.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RequestInfo {
36    /// Uppercased method — `POST`, `GET`, etc.
37    pub method: String,
38    /// Full request path (with query string if any).
39    pub path: String,
40    /// Lowercased header names → values. Multi-value headers are joined
41    /// with `, ` per RFC 7230. This trades some fidelity for a map shape
42    /// that's ergonomic to consume from TS.
43    pub headers: std::collections::HashMap<String, String>,
44    /// The exact bytes of the request body, UTF-8-decoded. Webhook
45    /// signature verification (Stripe, GitHub) needs the bytes that were
46    /// signed, so this is NOT the parsed JSON.
47    pub raw_body: String,
48}
49
50impl CallMessage {
51    pub fn new(
52        call_id: String,
53        fn_name: String,
54        fn_type: FnType,
55        args: serde_json::Value,
56        auth: AuthInfo,
57    ) -> Self {
58        Self {
59            msg_type: "call",
60            call_id,
61            fn_name,
62            fn_type,
63            args,
64            auth,
65            request: None,
66        }
67    }
68
69    /// Attach HTTP request metadata (used when the call originated from a
70    /// `defineRoute` HTTP binding rather than a programmatic invocation).
71    pub fn with_request(mut self, request: RequestInfo) -> Self {
72        self.request = Some(request);
73        self
74    }
75}
76
77/// Result of a DB operation, sent back to TypeScript.
78///
79/// `op_id` is echoed from the incoming `DbOpMessage.op_id` when present.
80/// The TS runtime uses it to demux concurrent DB ops inside a single
81/// function call (e.g. `Promise.all([ctx.db.get(a), ctx.db.get(b)])`).
82/// Absent `op_id` keeps legacy TS runtimes compatible.
83#[derive(Debug, Clone, Serialize)]
84pub struct DbResultMessage {
85    #[serde(rename = "type")]
86    pub msg_type: &'static str, // always "result"
87    pub call_id: String,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub op_id: Option<String>,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub data: Option<serde_json::Value>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub error: Option<ErrorInfo>,
94}
95
96impl DbResultMessage {
97    pub fn ok(call_id: String, data: serde_json::Value) -> Self {
98        Self {
99            msg_type: "result",
100            call_id,
101            op_id: None,
102            data: Some(data),
103            error: None,
104        }
105    }
106
107    pub fn ok_with_op(call_id: String, op_id: Option<String>, data: serde_json::Value) -> Self {
108        Self {
109            msg_type: "result",
110            call_id,
111            op_id,
112            data: Some(data),
113            error: None,
114        }
115    }
116
117    pub fn err(call_id: String, code: &str, message: &str) -> Self {
118        Self {
119            msg_type: "result",
120            call_id,
121            op_id: None,
122            data: None,
123            error: Some(ErrorInfo {
124                code: code.to_string(),
125                message: message.to_string(),
126            }),
127        }
128    }
129
130    pub fn err_with_op(call_id: String, op_id: Option<String>, code: &str, message: &str) -> Self {
131        Self {
132            msg_type: "result",
133            call_id,
134            op_id,
135            data: None,
136            error: Some(ErrorInfo {
137                code: code.to_string(),
138                message: message.to_string(),
139            }),
140        }
141    }
142}
143
144// ---------------------------------------------------------------------------
145// TypeScript → Rust messages
146// ---------------------------------------------------------------------------
147
148/// A message from the TypeScript handler back to Rust.
149#[derive(Debug, Clone, Deserialize)]
150#[serde(tag = "type")]
151pub enum TsMessage {
152    /// DB operation request.
153    #[serde(rename = "db")]
154    Db(DbOpMessage),
155
156    /// Stream a chunk to the HTTP client (SSE).
157    #[serde(rename = "stream")]
158    Stream(StreamChunkMessage),
159
160    /// Schedule a function for later execution.
161    #[serde(rename = "schedule")]
162    Schedule(ScheduleMessage),
163
164    /// Cancel a previously scheduled function.
165    #[serde(rename = "cancel_schedule")]
166    CancelSchedule(CancelScheduleMessage),
167
168    /// Call another function (for actions calling queries/mutations).
169    #[serde(rename = "run_fn")]
170    RunFn(RunFnMessage),
171
172    /// Function completed successfully.
173    #[serde(rename = "return")]
174    Return(ReturnMessage),
175
176    /// Function failed with an error.
177    #[serde(rename = "error")]
178    Error(ErrorMessage),
179
180    /// Initial handshake from the runtime: the list of functions it loaded.
181    /// Sent once at startup before any other message.
182    #[serde(rename = "ready")]
183    Ready(ReadyMessage),
184}
185
186/// Handshake payload from the TS runtime.
187#[derive(Debug, Clone, Deserialize)]
188pub struct ReadyMessage {
189    #[serde(default)]
190    pub functions: Vec<crate::registry::FnDef>,
191    #[serde(default)]
192    pub error: Option<String>,
193}
194
195/// A database operation request from TypeScript.
196#[derive(Debug, Clone, Deserialize)]
197pub struct DbOpMessage {
198    pub call_id: String,
199    /// Optional per-RPC id minted by the TS side. When present, the Rust
200    /// reply echoes it back on `DbResultMessage.op_id` so the TS runtime
201    /// can demux concurrent DB ops from a single handler (Promise.all).
202    /// Legacy TS runtimes that don't send op_id keep the old behavior:
203    /// only one in-flight RPC per call_id at a time.
204    #[serde(default)]
205    pub op_id: Option<String>,
206    pub op: DbOp,
207    pub entity: String,
208    #[serde(default)]
209    pub id: Option<String>,
210    #[serde(default)]
211    pub data: Option<serde_json::Value>,
212    #[serde(default)]
213    pub field: Option<String>,
214    #[serde(default)]
215    pub value: Option<String>,
216    #[serde(default)]
217    pub relation: Option<String>,
218    #[serde(default)]
219    pub target_id: Option<String>,
220    /// Cursor pagination — `paginate` op only. Opaque id-after cursor.
221    #[serde(default)]
222    pub after: Option<String>,
223    /// Cursor pagination — `paginate` op only. Requested page size.
224    #[serde(default)]
225    pub limit: Option<u32>,
226}
227
228/// Database operations available to TypeScript functions.
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
230#[serde(rename_all = "snake_case")]
231pub enum DbOp {
232    Get,
233    List,
234    /// Cursor-paginated list. Uses `after` + `limit` on [`DbOpMessage`].
235    /// Response shape: `{ page: Row[], nextCursor: string | null, isDone: bool }`.
236    Paginate,
237    Insert,
238    Update,
239    Delete,
240    Lookup,
241    Query,
242    QueryGraph,
243    Link,
244    Unlink,
245}
246
247/// A stream chunk to forward to the HTTP client as SSE.
248#[derive(Debug, Clone, Deserialize)]
249pub struct StreamChunkMessage {
250    pub call_id: String,
251    pub data: String,
252    /// Optional event type for SSE (defaults to "message").
253    #[serde(default)]
254    pub event: Option<String>,
255}
256
257/// Schedule a function for future execution.
258#[derive(Debug, Clone, Deserialize)]
259pub struct ScheduleMessage {
260    pub call_id: String,
261    pub fn_name: String,
262    pub args: serde_json::Value,
263    /// Run after this many milliseconds.
264    #[serde(default)]
265    pub delay_ms: Option<u64>,
266    /// Run at this Unix timestamp (ms since epoch).
267    #[serde(default)]
268    pub run_at: Option<u64>,
269}
270
271/// Cancel a scheduled function.
272#[derive(Debug, Clone, Deserialize)]
273pub struct CancelScheduleMessage {
274    pub call_id: String,
275    pub schedule_id: String,
276}
277
278/// Call another function from within an action.
279#[derive(Debug, Clone, Deserialize)]
280pub struct RunFnMessage {
281    pub call_id: String,
282    pub fn_name: String,
283    pub fn_type: FnType,
284    pub args: serde_json::Value,
285}
286
287/// Function returned successfully.
288#[derive(Debug, Clone, Deserialize)]
289pub struct ReturnMessage {
290    pub call_id: String,
291    pub value: serde_json::Value,
292}
293
294/// Function failed.
295#[derive(Debug, Clone, Deserialize)]
296pub struct ErrorMessage {
297    pub call_id: String,
298    pub code: String,
299    pub message: String,
300}
301
302// ---------------------------------------------------------------------------
303// Shared types
304// ---------------------------------------------------------------------------
305
306/// Function type.
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
308#[serde(rename_all = "snake_case")]
309pub enum FnType {
310    Query,
311    Mutation,
312    Action,
313}
314
315/// Auth context passed to function handlers.
316///
317/// Mirrors the three fields of the runtime's `AuthContext` that a
318/// mutation can legitimately read: the authenticated user id, admin
319/// flag, and active tenant. Functions that gate on `ctx.auth.tenantId`
320/// (anything org-scoped in a B2B app) need the last one forwarded — it's
321/// easy to forget and catches out every new multi-tenant example until
322/// someone hits "why is my session_tenant always null inside functions".
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct AuthInfo {
325    #[serde(skip_serializing_if = "Option::is_none")]
326    pub user_id: Option<String>,
327    pub is_admin: bool,
328    #[serde(default, skip_serializing_if = "Option::is_none")]
329    pub tenant_id: Option<String>,
330}
331
332/// Error info in protocol messages.
333#[derive(Debug, Clone, Serialize, Deserialize)]
334pub struct ErrorInfo {
335    pub code: String,
336    pub message: String,
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn call_message_serializes() {
345        let msg = CallMessage::new(
346            "c1".into(),
347            "placeBid".into(),
348            FnType::Mutation,
349            serde_json::json!({"lotId": "lot_1", "amount": 100}),
350            AuthInfo {
351                user_id: Some("user_1".into()),
352                is_admin: false,
353                tenant_id: None,
354            },
355        );
356        let json = serde_json::to_string(&msg).unwrap();
357        assert!(json.contains("\"type\":\"call\""));
358        assert!(json.contains("\"fn_type\":\"mutation\""));
359    }
360
361    #[test]
362    fn ts_message_deserializes_db_op() {
363        let json = r#"{"type":"db","call_id":"c1","op":"get","entity":"Lot","id":"lot_1"}"#;
364        let msg: TsMessage = serde_json::from_str(json).unwrap();
365        match msg {
366            TsMessage::Db(db) => {
367                assert_eq!(db.call_id, "c1");
368                assert_eq!(db.op, DbOp::Get);
369                assert_eq!(db.entity, "Lot");
370                assert_eq!(db.id.as_deref(), Some("lot_1"));
371            }
372            _ => panic!("expected Db message"),
373        }
374    }
375
376    #[test]
377    fn ts_message_deserializes_stream() {
378        let json = r#"{"type":"stream","call_id":"c1","data":"hello"}"#;
379        let msg: TsMessage = serde_json::from_str(json).unwrap();
380        match msg {
381            TsMessage::Stream(s) => {
382                assert_eq!(s.data, "hello");
383                assert!(s.event.is_none());
384            }
385            _ => panic!("expected Stream message"),
386        }
387    }
388
389    #[test]
390    fn ts_message_deserializes_return() {
391        let json = r#"{"type":"return","call_id":"c1","value":{"ok":true}}"#;
392        let msg: TsMessage = serde_json::from_str(json).unwrap();
393        match msg {
394            TsMessage::Return(r) => {
395                assert_eq!(r.value, serde_json::json!({"ok": true}));
396            }
397            _ => panic!("expected Return message"),
398        }
399    }
400
401    #[test]
402    fn ts_message_deserializes_schedule() {
403        let json = r#"{"type":"schedule","call_id":"c1","fn_name":"closeLot","args":{"lotId":"x"},"delay_ms":5000}"#;
404        let msg: TsMessage = serde_json::from_str(json).unwrap();
405        match msg {
406            TsMessage::Schedule(s) => {
407                assert_eq!(s.fn_name, "closeLot");
408                assert_eq!(s.delay_ms, Some(5000));
409            }
410            _ => panic!("expected Schedule message"),
411        }
412    }
413
414    #[test]
415    fn db_result_ok() {
416        let msg = DbResultMessage::ok("c1".into(), serde_json::json!({"id": "x"}));
417        let json = serde_json::to_string(&msg).unwrap();
418        assert!(json.contains("\"type\":\"result\""));
419        assert!(!json.contains("\"error\""));
420    }
421
422    #[test]
423    fn db_result_err() {
424        let msg = DbResultMessage::err("c1".into(), "NOT_FOUND", "not found");
425        let json = serde_json::to_string(&msg).unwrap();
426        assert!(json.contains("\"error\""));
427        assert!(!json.contains("\"data\""));
428    }
429}