Skip to main content

cairn_core/
live.rs

1//! The in-process live hub (design.md §10, slice 4a/4c).
2//!
3//! v1 model: **broadcast-refresh**. Cairn is stateless TEA — app state
4//! lives in SQLite (slice 1), so a connection is just the request that
5//! produced its page. When a request performs the `Live` effect
6//! (`publish(topic)`), every connection subscribed to that topic is
7//! re-run against the now-current database and the fresh view is what
8//! gets delivered. This is exactly the Turbo-8 broadcast-refresh model
9//! the research identified as the minimal-wiring live path; the
10//! Slice-2 structural `diff` is the documented *next* optimization
11//! (push `Change`s + a client morph instead of the full view).
12//!
13//! This module is the mechanism, fully testable in process. Real
14//! sockets (slice 4d) are a thin shell: write each returned body down
15//! that connection's WebSocket/SSE. The publish seam is thread-local
16//! (`wasm::drain_published`), so `request` and `drain_and_push` must be
17//! called on the same thread — true for the in-process hub and for a
18//! per-connection worker; a multi-tenant server swaps the seam for a
19//! real bus without touching the language surface.
20
21use crate::wasm::{
22    drain_published, drain_resp_headers, reason_phrase, serve_request_db,
23    HttpResponse, RunError,
24};
25use std::collections::HashMap;
26use std::io::{BufRead, BufReader, Read, Write};
27use std::net::{TcpListener, TcpStream};
28use std::time::Duration;
29
30/// A live connection: the request that rendered its page, plus the
31/// topics that page subscribes to (the app's `subscriptions(Model)`
32/// supplies these; the hub is topic-source-agnostic).
33struct Conn {
34    id: u64,
35    topics: Vec<String>,
36    method: String,
37    path: String,
38    body: String,
39    /// The serialized prior view (design.md §10, A3). Empty unless a
40    /// `live_handler` is set; then it is the `serialize_element` of the
41    /// last view this connection was sent, fed back so the canonical
42    /// Cairn `diff` runs against it.
43    prev: String,
44}
45
46/// Holds the lowered app and its open connections; turns `publish`
47/// into targeted re-renders. With a `live_handler` set the re-render
48/// is a structural diff (design.md §10, A3); otherwise it is the
49/// broadcast-refresh full page (A1).
50pub struct LiveHub {
51    wasm: Vec<u8>,
52    handler: String,
53    live_handler: Option<String>,
54    db_path: String,
55    next: u64,
56    conns: Vec<Conn>,
57}
58
59/// Split a `run_app_live` envelope `"<len>:<payload><rest>"` into
60/// `(payload, rest)`. `len` is the byte length of `payload`; `rest` is
61/// the trailing serialized view. A body with no ':' (a non-live
62/// handler) is returned as `(body, "")` so the broadcast path is
63/// unaffected.
64fn split_envelope(body: &str) -> (String, String) {
65    let Some(colon) = body.find(':') else {
66        return (body.to_string(), String::new());
67    };
68    let Ok(n) = body[..colon].parse::<usize>() else {
69        return (body.to_string(), String::new());
70    };
71    let start = colon + 1;
72    let end = start + n;
73    if end > body.len() {
74        return (body.to_string(), String::new());
75    }
76    (body[start..end].to_string(), body[end..].to_string())
77}
78
79impl LiveHub {
80    pub fn new(wasm: Vec<u8>, handler: &str, db_path: &str) -> Self {
81        Self {
82            wasm,
83            handler: handler.into(),
84            live_handler: None,
85            db_path: db_path.into(),
86            next: 1,
87            conns: Vec::new(),
88        }
89    }
90
91    /// Opt this hub into diff-based push (design.md §10, A3): `name` is
92    /// the app's `<handler>_live` entry (the `run_app_live` envelope —
93    /// full HTML on first render, the Change wire thereafter). Without
94    /// this the hub stays broadcast-refresh (A1), unchanged.
95    pub fn set_live_handler(&mut self, name: &str) {
96        self.live_handler = Some(name.into());
97    }
98
99    /// Open a connection for a page request. Returns its id and the
100    /// initial rendered response. `topics` is what this page subscribes
101    /// to. Any publishes during the initial render are discarded — a
102    /// fresh page is its own latest state.
103    pub fn connect(
104        &mut self,
105        topics: Vec<String>,
106        method: &str,
107        path: &str,
108        body: &str,
109    ) -> Result<(u64, HttpResponse), RunError> {
110        // With diff-push the first render goes through `<handler>_live`
111        // with an empty prior: the envelope's payload is the full HTML
112        // document (returned to the browser) and its trailing part is
113        // the serialized baseline this connection diffs against next.
114        let (resp, prev) = if let Some(live) = self.live_handler.clone() {
115            let env = serve_request_db(
116                &self.wasm,
117                &live,
118                &self.db_path,
119                method,
120                path,
121                "",
122            )?;
123            let (html, ser) = split_envelope(&env.body);
124            (
125                HttpResponse {
126                    status: env.status,
127                    body: html,
128                },
129                ser,
130            )
131        } else {
132            let resp = serve_request_db(
133                &self.wasm,
134                &self.handler,
135                &self.db_path,
136                method,
137                path,
138                body,
139            )?;
140            (resp, String::new())
141        };
142        let _ = drain_published();
143        let _ = drain_resp_headers(); // SSE stream carries no app headers
144        let id = self.next;
145        self.next += 1;
146        self.conns.push(Conn {
147            id,
148            topics,
149            method: method.into(),
150            path: path.into(),
151            body: body.into(),
152            prev,
153        });
154        Ok((id, resp))
155    }
156
157    /// Like [`connect`](Self::connect) but the page's topics come from
158    /// the app itself: a `subscriptions(req) -> Response` handler whose
159    /// body is the whitespace-delimited topic list (the v1 ABI form of
160    /// design.md §10's `subscriptions: Fn(Model) -> List<Topic>` —
161    /// pure, explicit, reviewed, no magic dependency capture). An empty
162    /// body means the page subscribes to nothing.
163    pub fn connect_subscribed(
164        &mut self,
165        subs_handler: &str,
166        method: &str,
167        path: &str,
168        body: &str,
169    ) -> Result<(u64, HttpResponse), RunError> {
170        let subs = serve_request_db(
171            &self.wasm,
172            subs_handler,
173            &self.db_path,
174            method,
175            path,
176            body,
177        )?;
178        let _ = drain_published();
179        let _ = drain_resp_headers();
180        let topics: Vec<String> = subs
181            .body
182            .split_whitespace()
183            .map(str::to_string)
184            .collect();
185        self.connect(topics, method, path, body)
186    }
187
188    /// A one-shot request that is not itself a live connection (e.g. the
189    /// POST that mutates state and `publish`es). Publishes it performs
190    /// are queued on this thread for the next `drain_and_push`.
191    pub fn request(
192        &self,
193        method: &str,
194        path: &str,
195        body: &str,
196    ) -> Result<HttpResponse, RunError> {
197        serve_request_db(
198            &self.wasm,
199            &self.handler,
200            &self.db_path,
201            method,
202            path,
203            body,
204        )
205    }
206
207    /// Drain the topics published on this thread since the last call;
208    /// for every connection subscribed to any of them, re-render
209    /// (stateless: the same request against the new DB state) and
210    /// return `(conn_id, fresh response)` to write down its socket.
211    /// Unsubscribed connections are untouched.
212    pub fn drain_and_push(
213        &mut self,
214    ) -> Result<Vec<(u64, HttpResponse)>, RunError> {
215        let topics = drain_published();
216        if topics.is_empty() {
217            return Ok(Vec::new());
218        }
219        // Snapshot the subscribed connections first so the wasm calls
220        // below don't alias `self.conns` (the live path writes back
221        // each conn's new serialized prior).
222        #[allow(clippy::type_complexity)]
223        let hits: Vec<(usize, u64, String, String, String, String)> = self
224            .conns
225            .iter()
226            .enumerate()
227            .filter(|(_, c)| c.topics.iter().any(|t| topics.contains(t)))
228            .map(|(i, c)| {
229                (
230                    i,
231                    c.id,
232                    c.method.clone(),
233                    c.path.clone(),
234                    c.body.clone(),
235                    c.prev.clone(),
236                )
237            })
238            .collect();
239        let mut out = Vec::new();
240        match self.live_handler.clone() {
241            Some(live) => {
242                for (i, id, method, path, _body, prev) in hits {
243                    // Re-render through `<handler>_live` with the prior
244                    // view as `req.body`: the envelope is the Change
245                    // wire vs that prior, plus the new serialized view.
246                    let env = serve_request_db(
247                        &self.wasm,
248                        &live,
249                        &self.db_path,
250                        &method,
251                        &path,
252                        &prev,
253                    )?;
254                    let (changes, ser) = split_envelope(&env.body);
255                    self.conns[i].prev = ser;
256                    out.push((
257                        id,
258                        HttpResponse {
259                            status: env.status,
260                            body: changes,
261                        },
262                    ));
263                }
264            }
265            None => {
266                for (_, id, method, path, body, _prev) in hits {
267                    // Broadcast-refresh (A1): the conn's stored request
268                    // re-rendered whole, unchanged.
269                    let resp = serve_request_db(
270                        &self.wasm,
271                        &self.handler,
272                        &self.db_path,
273                        &method,
274                        &path,
275                        &body,
276                    )?;
277                    out.push((id, resp));
278                }
279            }
280        }
281        // Subscriber re-renders are SSE-only; discard any headers they
282        // set so they never leak into the next normal HTTP response.
283        let _ = drain_resp_headers();
284        Ok(out)
285    }
286
287    /// Drop a connection by id — called when its socket write fails
288    /// (the client went away). After this the connection is no longer
289    /// re-rendered by `drain_and_push`. Unknown ids are a no-op (an
290    /// already-pruned conn double-reported as dead is not an error).
291    pub fn disconnect(&mut self, id: u64) {
292        self.conns.retain(|c| c.id != id);
293    }
294
295    /// Open connection count (for observability/tests).
296    pub fn connections(&self) -> usize {
297        self.conns.len()
298    }
299
300    /// Diff-push mode (a `live_handler` is set)? Then pushes are the
301    /// Change wire and the SSE stream sends no initial frame — the
302    /// browser already has the server-rendered document and the
303    /// baseline prior is held here. Broadcast mode sends the full page.
304    pub fn is_diff(&self) -> bool {
305        self.live_handler.is_some()
306    }
307}
308
309/// Format an HTML body as one Server-Sent Events message: every line
310/// prefixed `data: `, terminated by a blank line. SSE is the v1 live
311/// transport — it is a plain long-lived HTTP response (no handshake,
312/// no framing protocol), so it composes with the existing blocking
313/// HTTP serve far more simply than WebSocket. The browser rejoins
314/// multi-line `data:` with `\n`.
315pub fn sse_frame(body: &str) -> String {
316    let mut out = String::with_capacity(body.len() + 16);
317    for line in body.split('\n') {
318        out.push_str("data: ");
319        out.push_str(line);
320        out.push('\n');
321    }
322    out.push('\n');
323    out
324}
325
326/// The entire client side of liveness: open an `EventSource` to the
327/// live endpoint, **carrying this page's path+query**, and on each
328/// pushed message either apply the length-prefixed Change wire (A3
329/// diff-push: `^\d+:` — parse, resolve each leaf-to-root path over
330/// `document.body.firstChild`, and SetText/Replace/Append/Truncate via
331/// real DOM ops, building nodes from the same Element codec) or, for a
332/// full-page broadcast (A1), swap `document.body.innerHTML`. One
333/// discriminator, one shim for both modes — framework-emitted, never
334/// authored (the "no wiring" payoff). The host serve loop appends this
335/// to every page render.
336pub fn live_client_script(live_path: &str) -> String {
337    // Raw string (no format! brace-escaping); `__LP__` is the only
338    // interpolation. JS is `;`-separated so the Rust line-continuations
339    // keep it one logical line. The reader threads a byte offset; the
340    // wire is `count ":" change…`, a change is an op byte then the path
341    // `n ":" d ":" …` then the kind's payload (a length-prefixed string
342    // / a recursively-encoded Element / a Truncate length).
343    const JS: &str = r#"<script>(function(){
344var p=encodeURIComponent(location.pathname+location.search);
345var es=new EventSource("__LP__?path="+p);
346function root(){return document.body.firstChild;}
347function ri(s,o){var j=s.indexOf(":",o);return [parseInt(s.slice(o,j),10),j+1];}
348function rs(s,o){var x=ri(s,o);return [s.slice(x[1],x[1]+x[0]),x[1]+x[0]];}
349function rp(s,o){var x=ri(s,o),n=x[0],q=x[1],a=[];for(var i=0;i<n;i++){var y=ri(s,q);a.push(y[0]);q=y[1];}return [a,q];}
350function re(s,o){var t=s[o];o++;if(t=="T"){var x=rs(s,o);return [document.createTextNode(x[0]),x[1]];}var g=rs(s,o);o=g[1];var c=ri(s,o),nk=c[0];o=c[1];var e=document.createElement(g[0]);for(var i=0;i<nk;i++){var k=re(s,o);e.appendChild(k[0]);o=k[1];}return [e,o];}
351function resolve(a){var c=root();for(var i=a.length-1;i>=0;i--){c=c.childNodes[a[i]];}return c;}
352function apply(s){var x=ri(s,0),n=x[0],o=x[1];for(var i=0;i<n;i++){var op=s[o];o++;var pr=rp(s,o),a=pr[0];o=pr[1];if(op=="S"){var v=rs(s,o);o=v[1];resolve(a).textContent=v[0];}else if(op=="R"){var m=re(s,o);o=m[1];resolve(a).replaceWith(m[0]);}else if(op=="A"){var m2=re(s,o);o=m2[1];resolve(a).appendChild(m2[0]);}else if(op=="T"){var l=ri(s,o);o=l[1];var el=resolve(a);while(el.childNodes.length>l[0]){el.removeChild(el.lastChild);}}}}
353es.onmessage=function(ev){var d=ev.data;if(/^[0-9]+:/.test(d)){apply(d);}else{document.body.innerHTML=d;}};
354})();</script>"#;
355    JS.replace("__LP__", live_path)
356}
357
358/// Percent-decode a query-string component. `encodeURIComponent` (the
359/// shim's encoder) emits `%XX` for reserved bytes and never `+` for
360/// space, so we decode `%XX` and pass everything else through. Invalid
361/// escapes are left literal — a best-effort decode never panics.
362fn urldecode(s: &str) -> String {
363    let b = s.as_bytes();
364    let mut out = Vec::with_capacity(b.len());
365    let mut i = 0;
366    while i < b.len() {
367        if b[i] == b'%' && i + 2 < b.len() {
368            let hi = (b[i + 1] as char).to_digit(16);
369            let lo = (b[i + 2] as char).to_digit(16);
370            if let (Some(h), Some(l)) = (hi, lo) {
371                out.push((h * 16 + l) as u8);
372                i += 3;
373                continue;
374            }
375        }
376        out.push(b[i]);
377        i += 1;
378    }
379    String::from_utf8_lossy(&out).into_owned()
380}
381
382/// First value of `key` in an `&`-separated query string, decoded.
383fn query_param(query: &str, key: &str) -> Option<String> {
384    query.split('&').find_map(|kv| {
385        let (k, v) = kv.split_once('=')?;
386        (k == key).then(|| urldecode(v))
387    })
388}
389
390/// The concurrent SSE server (design.md §10, A1). Liveness is gated on
391/// the served handler's `requires Live` (decided upstream); this is the
392/// network shell over the proven [`LiveHub`].
393///
394/// **Concurrency model: a single-threaded accept loop with a parked-
395/// socket registry — deliberately not thread-per-connection.** The
396/// publish seam (`wasm::drain_published`) is thread-local; running every
397/// wasm call (page renders, the SSE initial render, mutating handlers,
398/// and `drain_and_push`) on this one thread makes publish→push correct
399/// for free. The only concurrency actually required is *holding N SSE
400/// responses open while still accepting requests*, which is a registry
401/// of parked `TcpStream`s, not concurrent handler execution. A slow SSE
402/// reader can stall the loop in `write_all`; bounded by a per-push write
403/// timeout + prune-on-error. A thread-pool/async server is the
404/// escalation when a real need forces it (design.md §9, Principle 10),
405/// not stubbed here.
406pub fn serve_http_live(
407    wasm: Vec<u8>,
408    handler: &str,
409    subs: Option<&str>,
410    live: Option<&str>,
411    addr: &str,
412    db_path: &str,
413    live_path: &str,
414) -> Result<(), RunError> {
415    let listener =
416        TcpListener::bind(addr).map_err(|e| RunError::Wasmtime(e.to_string()))?;
417    let mut hub = LiveHub::new(wasm, handler, db_path);
418    // `<handler>_live` present → structural diff push (A3); absent →
419    // broadcast-refresh (A1). Authoring picks neither; the convention does.
420    if let Some(l) = live {
421        hub.set_live_handler(l);
422    }
423    serve_http_live_on(listener, hub, subs, live_path)
424}
425
426/// The accept loop, over a pre-bound listener so it is loopback-testable
427/// (the public entry binds, then calls this). Returns only on a listener
428/// error; like `serve_http`, the loop itself otherwise runs forever.
429fn serve_http_live_on(
430    listener: TcpListener,
431    mut hub: LiveHub,
432    subs: Option<&str>,
433    live_path: &str,
434) -> Result<(), RunError> {
435    // conn id -> its parked long-lived SSE socket.
436    let mut parked: HashMap<u64, TcpStream> = HashMap::new();
437
438    for stream in listener.incoming() {
439        let Ok(mut stream) = stream else { continue };
440        let mut reader = BufReader::new(&stream);
441
442        let mut line = String::new();
443        if reader.read_line(&mut line).is_err() {
444            continue;
445        }
446        let mut parts = line.split_whitespace();
447        let method = parts.next().unwrap_or("GET").to_string();
448        let target = parts.next().unwrap_or("/").to_string();
449
450        let mut content_length = 0usize;
451        loop {
452            let mut h = String::new();
453            if reader.read_line(&mut h).is_err() {
454                break;
455            }
456            let h = h.trim_end();
457            if h.is_empty() {
458                break;
459            }
460            if let Some((name, value)) = h.split_once(':') {
461                if name.eq_ignore_ascii_case("content-length") {
462                    content_length = value.trim().parse().unwrap_or(0);
463                }
464            }
465        }
466        let mut body = String::new();
467        if content_length > 0 {
468            let mut buf = vec![0u8; content_length];
469            if reader.read_exact(&mut buf).is_ok() {
470                body = String::from_utf8_lossy(&buf).into_owned();
471            }
472        }
473        drop(reader);
474
475        let (path, query) = target.split_once('?').unwrap_or((&target, ""));
476
477        // --- The live endpoint: register a long-lived SSE connection. ---
478        if path == live_path {
479            let page = query_param(query, "path").unwrap_or_else(|| "/".into());
480            let opened = match subs {
481                Some(s) => hub.connect_subscribed(s, "GET", &page, ""),
482                None => hub.connect(Vec::new(), "GET", &page, ""),
483            };
484            let Ok((id, resp)) = opened else { continue };
485            // SSE is a plain long-lived HTTP response. In broadcast mode
486            // push the current render immediately (self-heals the
487            // GET↔first-publish race); the body is shim-free so it
488            // never spawns a 2nd EventSource. In diff mode send no
489            // initial frame — the browser already has the server
490            // -rendered document and the baseline prior is held here.
491            let head = "HTTP/1.1 200 OK\r\n\
492                Content-Type: text/event-stream\r\n\
493                Cache-Control: no-cache\r\n\
494                Connection: keep-alive\r\n\r\n";
495            let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
496            let ok = stream.write_all(head.as_bytes()).is_ok()
497                && (hub.is_diff()
498                    || stream
499                        .write_all(sse_frame(&resp.body).as_bytes())
500                        .is_ok());
501            if ok {
502                parked.insert(id, stream);
503            } else {
504                hub.disconnect(id);
505            }
506            continue;
507        }
508
509        // --- A normal request: run it, then fan any publish out. ---
510        let (status, mut out_body) =
511            match hub.request(&method, &target, &body) {
512                Ok(r) => (r.status, r.body),
513                Err(e) => (500, format!("cairn handler error: {e}")),
514            };
515        // The host appends the live shim to page documents (GET). It is
516        // NOT in pushed SSE frames — the EventSource is created once;
517        // refreshes only replace body content. Zero authored client code.
518        if method.eq_ignore_ascii_case("GET") {
519            out_body.push_str(&live_client_script(live_path));
520        }
521        // The Resp effect: emit headers this request set (same thread,
522        // right after the handler), before the blank line.
523        let extra: String = drain_resp_headers()
524            .into_iter()
525            .map(|(n, v)| format!("{n}: {v}\r\n"))
526            .collect();
527        let resp = format!(
528            "HTTP/1.1 {status} {}\r\nContent-Length: {}\r\n\
529             Content-Type: text/html; charset=utf-8\r\n{extra}\r\n{}",
530            reason_phrase(status),
531            out_body.len(),
532            out_body
533        );
534        let _ = stream.write_all(resp.as_bytes());
535        drop(stream);
536
537        // Fan the publish (if any) out to every subscribed parked conn.
538        let pushes = match hub.drain_and_push() {
539            Ok(p) => p,
540            Err(_) => continue,
541        };
542        for (id, r) in pushes {
543            let frame = sse_frame(&r.body);
544            let dead = match parked.get_mut(&id) {
545                Some(sock) => sock.write_all(frame.as_bytes()).is_err(),
546                None => false,
547            };
548            if dead {
549                parked.remove(&id);
550                hub.disconnect(id);
551            }
552        }
553    }
554    Ok(())
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use crate::edit::{Editor, ModuleSpec};
561    use crate::node::{Param, Produces};
562    use crate::store::Store;
563    use crate::ty::{Confidence, Effect, Type};
564    use crate::web;
565    use crate::wasm::lower;
566    use crate::ExprSpec;
567    use std::collections::BTreeSet;
568
569    /// The counter app used by every live test: `app(req)` — `/add`
570    /// bumps a SQLite counter and `publish("items")`, anything else
571    /// renders `count: <v>`; `subs(req)` — `/b` subscribes to "other",
572    /// everything else to "items" (pure, explicit; design.md §10).
573    /// Lowered to wasm; the served handler is `app`, its subscriptions
574    /// handler `subs`.
575    fn counter_app_wasm() -> Vec<u8> {
576        let lit = |s: &str| ExprSpec::Str(s.into());
577        let r = |n: &str| ExprSpec::Ref(n.into());
578        let cat = |a: ExprSpec, b: ExprSpec| {
579            ExprSpec::StrConcat(Box::new(a), Box::new(b))
580        };
581        let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
582            base: Box::new(b),
583            type_name: t.into(),
584            field: f.into(),
585        };
586        let dbq = |sql: &str| ExprSpec::DbQuery {
587            sql: Box::new(lit(sql)),
588            params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
589        };
590        let resp = |body: ExprSpec| ExprSpec::Record {
591            type_name: "Response".into(),
592            fields: vec![
593                ("status".into(), ExprSpec::Lit(200)),
594                ("body".into(), body),
595            ],
596        };
597        let create =
598            "CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
599        let mut effs = BTreeSet::new();
600        effs.insert(Effect::Db);
601        effs.insert(Effect::Live);
602
603        let app = crate::FunctionSpec {
604            name: "app".into(),
605            type_params: vec![],
606            params: vec![Param {
607                name: "req".into(),
608                ty: Type::Named("Request".into()),
609                min_confidence: Confidence::External,
610            }],
611            produces: Produces {
612                ty: Type::Named("Response".into()),
613                confidence: Confidence::External,
614            },
615            requires: effs,
616            on_failure: vec![],
617            steps: vec![
618                crate::StepSpec {
619                    binding: "path".into(),
620                    value: field(r("req"), "Request", "path"),
621                },
622                crate::StepSpec {
623                    binding: "c".into(),
624                    value: dbq(create),
625                },
626            ],
627            result: ExprSpec::If {
628                cond: Box::new(ExprSpec::StrEq(
629                    Box::new(r("path")),
630                    Box::new(lit("/add")),
631                )),
632                then_branch: Box::new(resp(cat(
633                    cat(
634                        lit("ok"),
635                        dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
636                             ON CONFLICT(id) DO UPDATE SET v=v+1"),
637                    ),
638                    ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
639                        Box::new(lit("items")),
640                    ))),
641                ))),
642                else_branch: Box::new(resp(cat(
643                    lit("count: "),
644                    dbq("SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)"),
645                ))),
646            },
647        };
648
649        let subs = crate::FunctionSpec {
650            name: "subs".into(),
651            type_params: vec![],
652            params: vec![Param {
653                name: "req".into(),
654                ty: Type::Named("Request".into()),
655                min_confidence: Confidence::External,
656            }],
657            produces: Produces {
658                ty: Type::Named("Response".into()),
659                confidence: Confidence::External,
660            },
661            requires: BTreeSet::new(),
662            on_failure: vec![],
663            steps: vec![crate::StepSpec {
664                binding: "path".into(),
665                value: field(r("req"), "Request", "path"),
666            }],
667            result: ExprSpec::If {
668                cond: Box::new(ExprSpec::StrEq(
669                    Box::new(r("path")),
670                    Box::new(lit("/b")),
671                )),
672                then_branch: Box::new(resp(lit("other"))),
673                else_branch: Box::new(resp(lit("items"))),
674            },
675        };
676
677        let e = Editor::new(Store::open_in_memory().unwrap());
678        let (m, report) = e
679            .apply_module(&ModuleSpec {
680                name: "live".into(),
681                types: web::types(),
682                functions: vec![app, subs],
683            })
684            .unwrap();
685        assert!(report.ok(), "violations: {:?}", report.violations);
686        lower(e.store(), &m).unwrap()
687    }
688
689    /// publish → targeted re-render. A connection subscribed to "items"
690    /// is refreshed after a request publishes it; one subscribed to
691    /// "other" is not. The whole live loop, in process, no sockets.
692    #[test]
693    fn publish_refreshes_only_subscribed_connections() {
694        let wasm = counter_app_wasm();
695
696        let mut path = std::env::temp_dir();
697        path.push(format!("cairn-live-{}.db", std::process::id()));
698        let _ = std::fs::remove_file(&path);
699        let dbp = path.to_str().unwrap();
700
701        let mut hub = LiveHub::new(wasm, "app", dbp);
702        // Topics derived from the app's own `subs` handler, not hardcoded.
703        let (a, a0) = hub
704            .connect_subscribed("subs", "GET", "/", "")
705            .unwrap();
706        assert!(a0.body.contains("count: 0"), "initial render: {}", a0.body);
707        let (b, _) = hub
708            .connect_subscribed("subs", "GET", "/b", "")
709            .unwrap();
710        assert_eq!(hub.connections(), 2);
711
712        // A request that mutates and publishes "items".
713        hub.request("POST", "/add", "").unwrap();
714        let pushes = hub.drain_and_push().unwrap();
715
716        assert_eq!(pushes.len(), 1, "only the 'items' subscriber refreshes");
717        assert_eq!(pushes[0].0, a, "connection A (subscribed to items)");
718        assert_ne!(pushes[0].0, b, "connection B must not be pushed");
719        assert!(
720            pushes[0].1.body.contains("count: 1"),
721            "refresh reflects new state: {}",
722            pushes[0].1.body
723        );
724
725        // No publish since → nothing to push.
726        assert!(hub.drain_and_push().unwrap().is_empty());
727
728        // Drop the only "items" subscriber (its socket died). A
729        // subsequent publish of "items" must push nothing — the pruned
730        // connection is no longer re-rendered — and the count drops.
731        hub.disconnect(a);
732        assert_eq!(hub.connections(), 1, "B remains after A is pruned");
733        hub.request("POST", "/add", "").unwrap();
734        assert!(
735            hub.drain_and_push().unwrap().is_empty(),
736            "a disconnected connection is never refreshed"
737        );
738        hub.disconnect(a); // double-report of a dead conn is a no-op
739        assert_eq!(hub.connections(), 1);
740        std::fs::remove_file(&path).unwrap();
741    }
742
743    #[test]
744    fn sse_frame_and_client_shim() {
745        // Multi-line body → one `data:` line each, blank-line terminated.
746        assert_eq!(
747            sse_frame("<div>a</div>\n<p>b</p>"),
748            "data: <div>a</div>\ndata: <p>b</p>\n\n"
749        );
750        assert_eq!(sse_frame(""), "data: \n\n");
751        // The shim is an EventSource against the live path carrying the
752        // page path+query, with both modes: apply the Change wire when
753        // it matches `^\d+:`, else swap full HTML.
754        let s = live_client_script("/__live/");
755        assert!(s.contains("new EventSource(\"/__live/?path=\"+p)"));
756        assert!(s.contains("encodeURIComponent(location.pathname+location.search)"));
757        // diff-push branch
758        assert!(s.contains("function apply(s)"));
759        assert!(s.contains("/^[0-9]+:/.test(d)"));
760        assert!(s.contains("resolve(a).textContent=v[0]"));
761        assert!(s.contains("resolve(a).replaceWith"));
762        assert!(s.contains("childNodes[a[i]]"));
763        // broadcast fallback
764        assert!(s.contains("document.body.innerHTML=d"));
765        assert!(s.starts_with("<script>") && s.ends_with("</script>"));
766    }
767
768    /// The whole A1 loop over a real loopback socket: an SSE client
769    /// connects, a separate POST publishes, and the publish is delivered
770    /// down the parked SSE socket as a fresh frame — plus a normal GET
771    /// page carries the auto-injected live shim. Proves the single-
772    /// thread + parked-registry model end to end without a browser.
773    #[test]
774    fn serve_http_live_pushes_a_refresh_over_a_real_socket() {
775        let wasm = counter_app_wasm();
776        let mut path = std::env::temp_dir();
777        path.push(format!("cairn-live-srv-{}.db", std::process::id()));
778        let _ = std::fs::remove_file(&path);
779        let dbp = path.to_str().unwrap().to_string();
780
781        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
782        let port = listener.local_addr().unwrap().port();
783        let hub = LiveHub::new(wasm, "app", &dbp);
784        // The server thread owns every wasm call (and the thread-local
785        // publish seam); the client below does pure TCP — exactly the
786        // single-thread invariant serve_http_live relies on.
787        std::thread::spawn(move || {
788            let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
789        });
790
791        // Read from `s` (5s deadline) until `needle` appears.
792        fn wait_for(s: &mut TcpStream, needle: &str) -> String {
793            s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
794            let mut acc = String::new();
795            let mut buf = [0u8; 1024];
796            loop {
797                match s.read(&mut buf) {
798                    Ok(0) => break,
799                    Ok(n) => {
800                        acc.push_str(&String::from_utf8_lossy(&buf[..n]));
801                        if acc.contains(needle) {
802                            break;
803                        }
804                    }
805                    Err(_) => break,
806                }
807            }
808            acc
809        }
810        let connect = || loop {
811            if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
812                return s;
813            }
814            std::thread::sleep(Duration::from_millis(10));
815        };
816
817        // 1. Open the SSE connection for page "/": event-stream head +
818        //    the current render as the first frame.
819        let mut sse = connect();
820        sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
821            .unwrap();
822        let first = wait_for(&mut sse, "count: 0");
823        assert!(first.contains("text/event-stream"), "SSE head: {first}");
824        assert!(first.contains("data: count: 0"), "first frame: {first}");
825
826        // 2. A POST that mutates state and publishes "items".
827        let mut post = connect();
828        post.write_all(
829            b"POST /add HTTP/1.1\r\nHost: x\r\nContent-Length: 0\r\n\r\n",
830        )
831        .unwrap();
832        let _ = wait_for(&mut post, "\r\n\r\n");
833
834        // 3. The publish fans out: the parked SSE socket gets a fresh
835        //    frame reflecting the new state — zero authored client code.
836        let pushed = wait_for(&mut sse, "count: 1");
837        assert!(pushed.contains("data: count: 1"), "no push: {pushed}");
838
839        // 4. A normal GET page carries the auto-injected live shim and
840        //    reflects persisted state.
841        let mut page = connect();
842        page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
843        let doc = wait_for(&mut page, "</script>");
844        assert!(doc.contains("count: 1"), "page state: {doc}");
845        assert!(
846            doc.contains("new EventSource(\"/__live/?path=\"+p)"),
847            "shim not injected: {doc}"
848        );
849
850        let _ = std::fs::remove_file(&path);
851    }
852
853    /// A3 over a real socket: with the `_live` handler the server sends
854    /// the SSE head (no initial frame — the diff baseline is held
855    /// server-side), and a publish delivers the length-prefixed Change
856    /// wire (not the full page) down the parked socket. The document
857    /// still carries the dual-mode shim.
858    #[test]
859    fn serve_http_live_pushes_the_change_wire_over_a_real_socket() {
860        let wasm = tea_live_counter_wasm();
861        let mut path = std::env::temp_dir();
862        path.push(format!("cairn-diffsrv-{}.db", std::process::id()));
863        let _ = std::fs::remove_file(&path);
864        let dbp = path.to_str().unwrap().to_string();
865
866        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
867        let port = listener.local_addr().unwrap().port();
868        let mut hub = LiveHub::new(wasm, "app", &dbp);
869        hub.set_live_handler("app_live");
870        std::thread::spawn(move || {
871            let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
872        });
873
874        fn wait_for(s: &mut TcpStream, needle: &str) -> String {
875            s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
876            let mut acc = String::new();
877            let mut buf = [0u8; 1024];
878            loop {
879                match s.read(&mut buf) {
880                    Ok(0) => break,
881                    Ok(n) => {
882                        acc.push_str(&String::from_utf8_lossy(&buf[..n]));
883                        if acc.contains(needle) {
884                            break;
885                        }
886                    }
887                    Err(_) => break,
888                }
889            }
890            acc
891        }
892        let connect = || loop {
893            if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
894                return s;
895            }
896            std::thread::sleep(Duration::from_millis(10));
897        };
898
899        // 1. The document: server-rendered page + the dual-mode shim.
900        let mut page = connect();
901        page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
902        let doc = wait_for(&mut page, "</script>");
903        assert!(doc.contains("<div>count: 0</div>"), "page: {doc}");
904        assert!(
905            doc.contains("new EventSource(\"/__live/?path=\"+p)")
906                && doc.contains("function apply(s)"),
907            "dual-mode shim missing: {doc}"
908        );
909
910        // 2. SSE: head only (diff mode sends no initial frame).
911        let mut sse = connect();
912        sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
913            .unwrap();
914        let head = wait_for(&mut sse, "\r\n\r\n");
915        assert!(head.contains("text/event-stream"), "SSE head: {head}");
916
917        // 3. The action publishes; the parked SSE gets the Change wire.
918        let mut act = connect();
919        act.write_all(b"GET /inc HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
920        let _ = wait_for(&mut act, "\r\n\r\n");
921        let pushed = wait_for(&mut sse, "count: 1");
922        assert!(
923            pushed.contains("data: 1:S1:0:8:count: 1"),
924            "expected the Change wire, got: {pushed:?}"
925        );
926        assert!(!pushed.contains("<div>"), "must not be full HTML");
927
928        let _ = std::fs::remove_file(&path);
929    }
930
931    /// A counter whose page/action handler `app` mutates+publishes (A1
932    /// style) and whose `app_live(req) = render_live(req.body, cload,
933    /// cview)` is the render-only diff entry; `subs` → topic "c". The
934    /// served handler is `app`, the live handler `app_live`.
935    fn tea_live_counter_wasm() -> Vec<u8> {
936        let lit = |s: &str| ExprSpec::Str(s.into());
937        let r = |n: &str| ExprSpec::Ref(n.into());
938        let cat = |a: ExprSpec, b: ExprSpec| {
939            ExprSpec::StrConcat(Box::new(a), Box::new(b))
940        };
941        let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
942            base: Box::new(b),
943            type_name: t.into(),
944            field: f.into(),
945        };
946        let dbq = |sql: &str| ExprSpec::DbQuery {
947            sql: Box::new(lit(sql)),
948            params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
949        };
950        let call = |f: &str, a: Vec<ExprSpec>| ExprSpec::Call {
951            func: f.into(),
952            args: a,
953        };
954        let resp = |body: ExprSpec| ExprSpec::Record {
955            type_name: "Response".into(),
956            fields: vec![
957                ("status".into(), ExprSpec::Lit(200)),
958                ("body".into(), body),
959            ],
960        };
961        let create =
962            "CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
963        let mut dbl = BTreeSet::new();
964        dbl.insert(Effect::Db);
965        dbl.insert(Effect::Live);
966        let mut dbo = BTreeSet::new();
967        dbo.insert(Effect::Db);
968
969        // cload() -> Counter @ Db
970        let cload = crate::FunctionSpec {
971            name: "cload".into(),
972            type_params: vec![],
973            params: vec![],
974            produces: Produces {
975                ty: Type::Named("Counter".into()),
976                confidence: Confidence::External,
977            },
978            requires: dbo.clone(),
979            on_failure: vec![],
980            steps: vec![crate::StepSpec {
981                binding: "c".into(),
982                value: dbq(create),
983            }],
984            result: ExprSpec::Record {
985                type_name: "Counter".into(),
986                fields: vec![(
987                    "value".into(),
988                    ExprSpec::StrToNumber(Box::new(dbq(
989                        "SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)",
990                    ))),
991                )],
992            },
993        };
994        // cview(m) -> Element : <div>count: N</div>
995        let cview = crate::FunctionSpec {
996            name: "cview".into(),
997            type_params: vec![],
998            params: vec![Param {
999                name: "m".into(),
1000                ty: Type::Named("Counter".into()),
1001                min_confidence: Confidence::External,
1002            }],
1003            produces: Produces {
1004                ty: Type::Named("Element".into()),
1005                confidence: Confidence::External,
1006            },
1007            requires: BTreeSet::new(),
1008            on_failure: vec![],
1009            steps: vec![],
1010            result: ExprSpec::Variant {
1011                type_name: "Element".into(),
1012                case: "El".into(),
1013                fields: vec![
1014                    ("tag".into(), lit("div")),
1015                    (
1016                        "kids".into(),
1017                        ExprSpec::List(vec![ExprSpec::Variant {
1018                            type_name: "Element".into(),
1019                            case: "Text".into(),
1020                            fields: vec![(
1021                                "content".into(),
1022                                cat(
1023                                    lit("count: "),
1024                                    ExprSpec::NumberToStr(Box::new(field(
1025                                        r("m"),
1026                                        "Counter",
1027                                        "value",
1028                                    ))),
1029                                ),
1030                            )],
1031                        }]),
1032                    ),
1033                ],
1034            },
1035        };
1036        // app(req): /inc -> bump + publish("c"); else render the page.
1037        let app = crate::FunctionSpec {
1038            name: "app".into(),
1039            type_params: vec![],
1040            params: vec![Param {
1041                name: "req".into(),
1042                ty: Type::Named("Request".into()),
1043                min_confidence: Confidence::External,
1044            }],
1045            produces: Produces {
1046                ty: Type::Named("Response".into()),
1047                confidence: Confidence::External,
1048            },
1049            requires: dbl.clone(),
1050            on_failure: vec![],
1051            steps: vec![
1052                crate::StepSpec {
1053                    binding: "path".into(),
1054                    value: field(r("req"), "Request", "path"),
1055                },
1056                crate::StepSpec {
1057                    binding: "c".into(),
1058                    value: dbq(create),
1059                },
1060            ],
1061            result: ExprSpec::If {
1062                cond: Box::new(ExprSpec::StrEq(
1063                    Box::new(r("path")),
1064                    Box::new(lit("/inc")),
1065                )),
1066                then_branch: Box::new(resp(cat(
1067                    cat(
1068                        lit("ok"),
1069                        dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
1070                             ON CONFLICT(id) DO UPDATE SET v=v+1"),
1071                    ),
1072                    ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
1073                        Box::new(lit("c")),
1074                    ))),
1075                ))),
1076                else_branch: Box::new(resp(call(
1077                    "render_html",
1078                    vec![call("cview", vec![call("cload", vec![])])],
1079                ))),
1080            },
1081        };
1082        // app_live(req) = render_live(req.body, &cload, &cview)
1083        let app_live = crate::FunctionSpec {
1084            name: "app_live".into(),
1085            type_params: vec![],
1086            params: vec![Param {
1087                name: "req".into(),
1088                ty: Type::Named("Request".into()),
1089                min_confidence: Confidence::External,
1090            }],
1091            produces: Produces {
1092                ty: Type::Named("Response".into()),
1093                confidence: Confidence::External,
1094            },
1095            requires: dbo.clone(),
1096            on_failure: vec![],
1097            steps: vec![],
1098            result: call(
1099                "render_live",
1100                vec![
1101                    field(r("req"), "Request", "body"),
1102                    ExprSpec::FuncRef("cload".into()),
1103                    ExprSpec::FuncRef("cview".into()),
1104                ],
1105            ),
1106        };
1107        // subs(req) -> "c"
1108        let subs = crate::FunctionSpec {
1109            name: "subs".into(),
1110            type_params: vec![],
1111            params: vec![Param {
1112                name: "req".into(),
1113                ty: Type::Named("Request".into()),
1114                min_confidence: Confidence::External,
1115            }],
1116            produces: Produces {
1117                ty: Type::Named("Response".into()),
1118                confidence: Confidence::External,
1119            },
1120            requires: BTreeSet::new(),
1121            on_failure: vec![],
1122            steps: vec![],
1123            result: resp(lit("c")),
1124        };
1125
1126        let mut types = web::types();
1127        types.push(crate::edit::TypeDefSpec::Record {
1128            name: "Counter".into(),
1129            fields: vec![("value".into(), Type::Number)],
1130        });
1131        let mut funcs = web::functions();
1132        funcs.extend([cload, cview, app, app_live, subs]);
1133
1134        let e = Editor::new(Store::open_in_memory().unwrap());
1135        let (m, report) = e
1136            .apply_module(&ModuleSpec {
1137                name: "tealive".into(),
1138                types,
1139                functions: funcs,
1140            })
1141            .unwrap();
1142        assert!(report.ok(), "violations: {:?}", report.violations);
1143        lower(e.store(), &m).unwrap()
1144    }
1145
1146    /// A3 (design.md §10): with a `live_handler` set the hub pushes the
1147    /// length-prefixed **Change wire** (a structural diff vs the prior
1148    /// view), not the full page; the connection's prior is threaded so
1149    /// successive publishes diff against the last sent view. The render
1150    /// path is idempotent — it never re-applies the message.
1151    #[test]
1152    fn diff_push_sends_the_change_wire_not_the_full_page() {
1153        let wasm = tea_live_counter_wasm();
1154        let mut path = std::env::temp_dir();
1155        path.push(format!("cairn-diffpush-{}.db", std::process::id()));
1156        let _ = std::fs::remove_file(&path);
1157        let dbp = path.to_str().unwrap();
1158
1159        let mut hub = LiveHub::new(wasm, "app", dbp);
1160        hub.set_live_handler("app_live");
1161
1162        // connect: render-only first render → the HTML document (not an
1163        // envelope; the hub split it) and a serialized baseline. The
1164        // baseline render must NOT have mutated state.
1165        let (id, doc) = hub
1166            .connect_subscribed("subs", "GET", "/", "")
1167            .unwrap();
1168        assert_eq!(
1169            doc.body, "<div>count: 0</div>",
1170            "first render is the HTML page at current (un-mutated) state"
1171        );
1172
1173        // The action mutates + publishes "c".
1174        hub.request("GET", "/inc", "").unwrap();
1175        let pushes = hub.drain_and_push().unwrap();
1176        assert_eq!(pushes.len(), 1);
1177        assert_eq!(pushes[0].0, id);
1178        // Exactly the Change wire: 1 change, SetText at path [0],
1179        // length-prefixed content "count: 1". Not HTML.
1180        assert_eq!(pushes[0].1.body, "1:S1:0:8:count: 1");
1181        assert!(!pushes[0].1.body.contains("<div>"));
1182
1183        // Prior threading: a second publish diffs against count:1, not
1184        // the original baseline.
1185        hub.request("GET", "/inc", "").unwrap();
1186        let p2 = hub.drain_and_push().unwrap();
1187        assert_eq!(p2.len(), 1);
1188        assert_eq!(p2[0].1.body, "1:S1:0:8:count: 2");
1189
1190        // No publish → nothing.
1191        assert!(hub.drain_and_push().unwrap().is_empty());
1192        std::fs::remove_file(&path).unwrap();
1193    }
1194}