pylon-functions 0.3.23

Pylon — realtime backend as a single Rust binary. Schema, policies, server functions, live queries, auth — one process.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Bidirectional NDJSON protocol between Rust runtime and TypeScript process.
//!
//! Messages are newline-delimited JSON objects. Each function invocation gets
//! a unique `call_id` for multiplexing concurrent calls over a single connection.

use serde::{Deserialize, Serialize};

// ---------------------------------------------------------------------------
// Rust → TypeScript messages
// ---------------------------------------------------------------------------

/// Invoke a function on the TypeScript side.
#[derive(Debug, Clone, Serialize)]
pub struct CallMessage {
    #[serde(rename = "type")]
    pub msg_type: &'static str, // always "call"
    pub call_id: String,
    pub fn_name: String,
    pub fn_type: FnType,
    pub args: serde_json::Value,
    pub auth: AuthInfo,
    /// HTTP request context — present only when the action is invoked via
    /// a custom HTTP route (`defineRoute` binding). Actions called from
    /// other actions via `ctx.runAction` or from jobs don't get this.
    /// Enables Stripe-webhook-style signature verification + access to
    /// raw headers/body the router would otherwise discard.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub request: Option<RequestInfo>,
}

/// HTTP request metadata forwarded to TypeScript actions invoked via
/// `defineRoute` bindings. All fields are strings so the TS side can use
/// them directly without re-parsing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestInfo {
    /// Uppercased method — `POST`, `GET`, etc.
    pub method: String,
    /// Full request path (with query string if any).
    pub path: String,
    /// Lowercased header names → values. Multi-value headers are joined
    /// with `, ` per RFC 7230. This trades some fidelity for a map shape
    /// that's ergonomic to consume from TS.
    pub headers: std::collections::HashMap<String, String>,
    /// The exact bytes of the request body, UTF-8-decoded. Webhook
    /// signature verification (Stripe, GitHub) needs the bytes that were
    /// signed, so this is NOT the parsed JSON.
    pub raw_body: String,
}

impl CallMessage {
    pub fn new(
        call_id: String,
        fn_name: String,
        fn_type: FnType,
        args: serde_json::Value,
        auth: AuthInfo,
    ) -> Self {
        Self {
            msg_type: "call",
            call_id,
            fn_name,
            fn_type,
            args,
            auth,
            request: None,
        }
    }

    /// Attach HTTP request metadata (used when the call originated from a
    /// `defineRoute` HTTP binding rather than a programmatic invocation).
    pub fn with_request(mut self, request: RequestInfo) -> Self {
        self.request = Some(request);
        self
    }
}

/// Result of a DB operation, sent back to TypeScript.
///
/// `op_id` is echoed from the incoming `DbOpMessage.op_id` when present.
/// The TS runtime uses it to demux concurrent DB ops inside a single
/// function call (e.g. `Promise.all([ctx.db.get(a), ctx.db.get(b)])`).
/// Absent `op_id` keeps legacy TS runtimes compatible.
#[derive(Debug, Clone, Serialize)]
pub struct DbResultMessage {
    #[serde(rename = "type")]
    pub msg_type: &'static str, // always "result"
    pub call_id: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub op_id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub data: Option<serde_json::Value>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<ErrorInfo>,
}

impl DbResultMessage {
    pub fn ok(call_id: String, data: serde_json::Value) -> Self {
        Self {
            msg_type: "result",
            call_id,
            op_id: None,
            data: Some(data),
            error: None,
        }
    }

    pub fn ok_with_op(call_id: String, op_id: Option<String>, data: serde_json::Value) -> Self {
        Self {
            msg_type: "result",
            call_id,
            op_id,
            data: Some(data),
            error: None,
        }
    }

    pub fn err(call_id: String, code: &str, message: &str) -> Self {
        Self {
            msg_type: "result",
            call_id,
            op_id: None,
            data: None,
            error: Some(ErrorInfo {
                code: code.to_string(),
                message: message.to_string(),
            }),
        }
    }

    pub fn err_with_op(call_id: String, op_id: Option<String>, code: &str, message: &str) -> Self {
        Self {
            msg_type: "result",
            call_id,
            op_id,
            data: None,
            error: Some(ErrorInfo {
                code: code.to_string(),
                message: message.to_string(),
            }),
        }
    }
}

// ---------------------------------------------------------------------------
// TypeScript → Rust messages
// ---------------------------------------------------------------------------

/// A message from the TypeScript handler back to Rust.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum TsMessage {
    /// DB operation request.
    #[serde(rename = "db")]
    Db(DbOpMessage),

    /// Stream a chunk to the HTTP client (SSE).
    #[serde(rename = "stream")]
    Stream(StreamChunkMessage),

    /// Schedule a function for later execution.
    #[serde(rename = "schedule")]
    Schedule(ScheduleMessage),

    /// Cancel a previously scheduled function.
    #[serde(rename = "cancel_schedule")]
    CancelSchedule(CancelScheduleMessage),

    /// Call another function (for actions calling queries/mutations).
    #[serde(rename = "run_fn")]
    RunFn(RunFnMessage),

    /// Function completed successfully.
    #[serde(rename = "return")]
    Return(ReturnMessage),

    /// Function failed with an error.
    #[serde(rename = "error")]
    Error(ErrorMessage),

    /// Initial handshake from the runtime: the list of functions it loaded.
    /// Sent once at startup before any other message.
    #[serde(rename = "ready")]
    Ready(ReadyMessage),
}

/// Handshake payload from the TS runtime.
#[derive(Debug, Clone, Deserialize)]
pub struct ReadyMessage {
    #[serde(default)]
    pub functions: Vec<crate::registry::FnDef>,
    #[serde(default)]
    pub error: Option<String>,
}

/// A database operation request from TypeScript.
#[derive(Debug, Clone, Deserialize)]
pub struct DbOpMessage {
    pub call_id: String,
    /// Optional per-RPC id minted by the TS side. When present, the Rust
    /// reply echoes it back on `DbResultMessage.op_id` so the TS runtime
    /// can demux concurrent DB ops from a single handler (Promise.all).
    /// Legacy TS runtimes that don't send op_id keep the old behavior:
    /// only one in-flight RPC per call_id at a time.
    #[serde(default)]
    pub op_id: Option<String>,
    pub op: DbOp,
    pub entity: String,
    #[serde(default)]
    pub id: Option<String>,
    #[serde(default)]
    pub data: Option<serde_json::Value>,
    #[serde(default)]
    pub field: Option<String>,
    #[serde(default)]
    pub value: Option<String>,
    #[serde(default)]
    pub relation: Option<String>,
    #[serde(default)]
    pub target_id: Option<String>,
    /// Cursor pagination — `paginate` op only. Opaque id-after cursor.
    #[serde(default)]
    pub after: Option<String>,
    /// Cursor pagination — `paginate` op only. Requested page size.
    #[serde(default)]
    pub limit: Option<u32>,
}

/// Database operations available to TypeScript functions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum DbOp {
    Get,
    List,
    /// Cursor-paginated list. Uses `after` + `limit` on [`DbOpMessage`].
    /// Response shape: `{ page: Row[], nextCursor: string | null, isDone: bool }`.
    Paginate,
    Insert,
    Update,
    Delete,
    Lookup,
    Query,
    QueryGraph,
    Link,
    Unlink,
    /// Faceted full-text search. The query body is whatever the entity
    /// declares in its `search:` config — query string, filters,
    /// facets, sort, page, pageSize. Carried on `data`.
    /// Response shape: `{ hits, facetCounts, total, tookMs }`.
    Search,
}

/// A stream chunk to forward to the HTTP client as SSE.
#[derive(Debug, Clone, Deserialize)]
pub struct StreamChunkMessage {
    pub call_id: String,
    pub data: String,
    /// Optional event type for SSE (defaults to "message").
    #[serde(default)]
    pub event: Option<String>,
}

/// Schedule a function for future execution.
#[derive(Debug, Clone, Deserialize)]
pub struct ScheduleMessage {
    pub call_id: String,
    pub fn_name: String,
    pub args: serde_json::Value,
    /// Run after this many milliseconds.
    #[serde(default)]
    pub delay_ms: Option<u64>,
    /// Run at this Unix timestamp (ms since epoch).
    #[serde(default)]
    pub run_at: Option<u64>,
}

/// Cancel a scheduled function.
#[derive(Debug, Clone, Deserialize)]
pub struct CancelScheduleMessage {
    pub call_id: String,
    pub schedule_id: String,
}

/// Call another function from within an action.
#[derive(Debug, Clone, Deserialize)]
pub struct RunFnMessage {
    pub call_id: String,
    pub fn_name: String,
    pub fn_type: FnType,
    pub args: serde_json::Value,
}

/// Function returned successfully.
#[derive(Debug, Clone, Deserialize)]
pub struct ReturnMessage {
    pub call_id: String,
    pub value: serde_json::Value,
}

/// Function failed.
#[derive(Debug, Clone, Deserialize)]
pub struct ErrorMessage {
    pub call_id: String,
    pub code: String,
    pub message: String,
}

// ---------------------------------------------------------------------------
// Shared types
// ---------------------------------------------------------------------------

/// Function type.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FnType {
    Query,
    Mutation,
    Action,
}

/// Auth context passed to function handlers.
///
/// Mirrors the three fields of the runtime's `AuthContext` that a
/// mutation can legitimately read: the authenticated user id, admin
/// flag, and active tenant. Functions that gate on `ctx.auth.tenantId`
/// (anything org-scoped in a B2B app) need the last one forwarded — it's
/// easy to forget and catches out every new multi-tenant example until
/// someone hits "why is my session_tenant always null inside functions".
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthInfo {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub user_id: Option<String>,
    pub is_admin: bool,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tenant_id: Option<String>,
}

/// Error info in protocol messages.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorInfo {
    pub code: String,
    pub message: String,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn call_message_serializes() {
        let msg = CallMessage::new(
            "c1".into(),
            "placeBid".into(),
            FnType::Mutation,
            serde_json::json!({"lotId": "lot_1", "amount": 100}),
            AuthInfo {
                user_id: Some("user_1".into()),
                is_admin: false,
                tenant_id: None,
            },
        );
        let json = serde_json::to_string(&msg).unwrap();
        assert!(json.contains("\"type\":\"call\""));
        assert!(json.contains("\"fn_type\":\"mutation\""));
    }

    #[test]
    fn ts_message_deserializes_db_op() {
        let json = r#"{"type":"db","call_id":"c1","op":"get","entity":"Lot","id":"lot_1"}"#;
        let msg: TsMessage = serde_json::from_str(json).unwrap();
        match msg {
            TsMessage::Db(db) => {
                assert_eq!(db.call_id, "c1");
                assert_eq!(db.op, DbOp::Get);
                assert_eq!(db.entity, "Lot");
                assert_eq!(db.id.as_deref(), Some("lot_1"));
            }
            _ => panic!("expected Db message"),
        }
    }

    #[test]
    fn ts_message_deserializes_stream() {
        let json = r#"{"type":"stream","call_id":"c1","data":"hello"}"#;
        let msg: TsMessage = serde_json::from_str(json).unwrap();
        match msg {
            TsMessage::Stream(s) => {
                assert_eq!(s.data, "hello");
                assert!(s.event.is_none());
            }
            _ => panic!("expected Stream message"),
        }
    }

    #[test]
    fn ts_message_deserializes_return() {
        let json = r#"{"type":"return","call_id":"c1","value":{"ok":true}}"#;
        let msg: TsMessage = serde_json::from_str(json).unwrap();
        match msg {
            TsMessage::Return(r) => {
                assert_eq!(r.value, serde_json::json!({"ok": true}));
            }
            _ => panic!("expected Return message"),
        }
    }

    #[test]
    fn ts_message_deserializes_schedule() {
        let json = r#"{"type":"schedule","call_id":"c1","fn_name":"closeLot","args":{"lotId":"x"},"delay_ms":5000}"#;
        let msg: TsMessage = serde_json::from_str(json).unwrap();
        match msg {
            TsMessage::Schedule(s) => {
                assert_eq!(s.fn_name, "closeLot");
                assert_eq!(s.delay_ms, Some(5000));
            }
            _ => panic!("expected Schedule message"),
        }
    }

    #[test]
    fn db_result_ok() {
        let msg = DbResultMessage::ok("c1".into(), serde_json::json!({"id": "x"}));
        let json = serde_json::to_string(&msg).unwrap();
        assert!(json.contains("\"type\":\"result\""));
        assert!(!json.contains("\"error\""));
    }

    #[test]
    fn db_result_err() {
        let msg = DbResultMessage::err("c1".into(), "NOT_FOUND", "not found");
        let json = serde_json::to_string(&msg).unwrap();
        assert!(json.contains("\"error\""));
        assert!(!json.contains("\"data\""));
    }
}