1use crate::wasm::{
22 drain_published, drain_resp_headers, reason_phrase, serve_request_db,
23 HttpResponse, RunError,
24};
25use std::collections::HashMap;
26use std::io::{BufRead, BufReader, Read, Write};
27use std::net::{TcpListener, TcpStream};
28use std::time::Duration;
29
30struct Conn {
34 id: u64,
35 topics: Vec<String>,
36 method: String,
37 path: String,
38 body: String,
39 prev: String,
44}
45
46pub struct LiveHub {
51 wasm: Vec<u8>,
52 handler: String,
53 live_handler: Option<String>,
54 db_path: String,
55 next: u64,
56 conns: Vec<Conn>,
57}
58
59fn split_envelope(body: &str) -> (String, String) {
65 let Some(colon) = body.find(':') else {
66 return (body.to_string(), String::new());
67 };
68 let Ok(n) = body[..colon].parse::<usize>() else {
69 return (body.to_string(), String::new());
70 };
71 let start = colon + 1;
72 let end = start + n;
73 if end > body.len() {
74 return (body.to_string(), String::new());
75 }
76 (body[start..end].to_string(), body[end..].to_string())
77}
78
79impl LiveHub {
80 pub fn new(wasm: Vec<u8>, handler: &str, db_path: &str) -> Self {
81 Self {
82 wasm,
83 handler: handler.into(),
84 live_handler: None,
85 db_path: db_path.into(),
86 next: 1,
87 conns: Vec::new(),
88 }
89 }
90
91 pub fn set_live_handler(&mut self, name: &str) {
96 self.live_handler = Some(name.into());
97 }
98
99 pub fn connect(
104 &mut self,
105 topics: Vec<String>,
106 method: &str,
107 path: &str,
108 body: &str,
109 ) -> Result<(u64, HttpResponse), RunError> {
110 let (resp, prev) = if let Some(live) = self.live_handler.clone() {
115 let env = serve_request_db(
116 &self.wasm,
117 &live,
118 &self.db_path,
119 method,
120 path,
121 "",
122 )?;
123 let (html, ser) = split_envelope(&env.body);
124 (
125 HttpResponse {
126 status: env.status,
127 body: html,
128 },
129 ser,
130 )
131 } else {
132 let resp = serve_request_db(
133 &self.wasm,
134 &self.handler,
135 &self.db_path,
136 method,
137 path,
138 body,
139 )?;
140 (resp, String::new())
141 };
142 let _ = drain_published();
143 let _ = drain_resp_headers(); let id = self.next;
145 self.next += 1;
146 self.conns.push(Conn {
147 id,
148 topics,
149 method: method.into(),
150 path: path.into(),
151 body: body.into(),
152 prev,
153 });
154 Ok((id, resp))
155 }
156
157 pub fn connect_subscribed(
164 &mut self,
165 subs_handler: &str,
166 method: &str,
167 path: &str,
168 body: &str,
169 ) -> Result<(u64, HttpResponse), RunError> {
170 let subs = serve_request_db(
171 &self.wasm,
172 subs_handler,
173 &self.db_path,
174 method,
175 path,
176 body,
177 )?;
178 let _ = drain_published();
179 let _ = drain_resp_headers();
180 let topics: Vec<String> = subs
181 .body
182 .split_whitespace()
183 .map(str::to_string)
184 .collect();
185 self.connect(topics, method, path, body)
186 }
187
188 pub fn request(
192 &self,
193 method: &str,
194 path: &str,
195 body: &str,
196 ) -> Result<HttpResponse, RunError> {
197 serve_request_db(
198 &self.wasm,
199 &self.handler,
200 &self.db_path,
201 method,
202 path,
203 body,
204 )
205 }
206
207 pub fn drain_and_push(
213 &mut self,
214 ) -> Result<Vec<(u64, HttpResponse)>, RunError> {
215 let topics = drain_published();
216 if topics.is_empty() {
217 return Ok(Vec::new());
218 }
219 #[allow(clippy::type_complexity)]
223 let hits: Vec<(usize, u64, String, String, String, String)> = self
224 .conns
225 .iter()
226 .enumerate()
227 .filter(|(_, c)| c.topics.iter().any(|t| topics.contains(t)))
228 .map(|(i, c)| {
229 (
230 i,
231 c.id,
232 c.method.clone(),
233 c.path.clone(),
234 c.body.clone(),
235 c.prev.clone(),
236 )
237 })
238 .collect();
239 let mut out = Vec::new();
240 match self.live_handler.clone() {
241 Some(live) => {
242 for (i, id, method, path, _body, prev) in hits {
243 let env = serve_request_db(
247 &self.wasm,
248 &live,
249 &self.db_path,
250 &method,
251 &path,
252 &prev,
253 )?;
254 let (changes, ser) = split_envelope(&env.body);
255 self.conns[i].prev = ser;
256 out.push((
257 id,
258 HttpResponse {
259 status: env.status,
260 body: changes,
261 },
262 ));
263 }
264 }
265 None => {
266 for (_, id, method, path, body, _prev) in hits {
267 let resp = serve_request_db(
270 &self.wasm,
271 &self.handler,
272 &self.db_path,
273 &method,
274 &path,
275 &body,
276 )?;
277 out.push((id, resp));
278 }
279 }
280 }
281 let _ = drain_resp_headers();
284 Ok(out)
285 }
286
287 pub fn disconnect(&mut self, id: u64) {
292 self.conns.retain(|c| c.id != id);
293 }
294
295 pub fn connections(&self) -> usize {
297 self.conns.len()
298 }
299
300 pub fn is_diff(&self) -> bool {
305 self.live_handler.is_some()
306 }
307}
308
309pub fn sse_frame(body: &str) -> String {
316 let mut out = String::with_capacity(body.len() + 16);
317 for line in body.split('\n') {
318 out.push_str("data: ");
319 out.push_str(line);
320 out.push('\n');
321 }
322 out.push('\n');
323 out
324}
325
326pub fn live_client_script(live_path: &str) -> String {
337 const JS: &str = r#"<script>(function(){
344var p=encodeURIComponent(location.pathname+location.search);
345var es=new EventSource("__LP__?path="+p);
346function root(){return document.body.firstChild;}
347function ri(s,o){var j=s.indexOf(":",o);return [parseInt(s.slice(o,j),10),j+1];}
348function rs(s,o){var x=ri(s,o);return [s.slice(x[1],x[1]+x[0]),x[1]+x[0]];}
349function rp(s,o){var x=ri(s,o),n=x[0],q=x[1],a=[];for(var i=0;i<n;i++){var y=ri(s,q);a.push(y[0]);q=y[1];}return [a,q];}
350function re(s,o){var t=s[o];o++;if(t=="T"){var x=rs(s,o);return [document.createTextNode(x[0]),x[1]];}var g=rs(s,o);o=g[1];var c=ri(s,o),nk=c[0];o=c[1];var e=document.createElement(g[0]);for(var i=0;i<nk;i++){var k=re(s,o);e.appendChild(k[0]);o=k[1];}return [e,o];}
351function resolve(a){var c=root();for(var i=a.length-1;i>=0;i--){c=c.childNodes[a[i]];}return c;}
352function apply(s){var x=ri(s,0),n=x[0],o=x[1];for(var i=0;i<n;i++){var op=s[o];o++;var pr=rp(s,o),a=pr[0];o=pr[1];if(op=="S"){var v=rs(s,o);o=v[1];resolve(a).textContent=v[0];}else if(op=="R"){var m=re(s,o);o=m[1];resolve(a).replaceWith(m[0]);}else if(op=="A"){var m2=re(s,o);o=m2[1];resolve(a).appendChild(m2[0]);}else if(op=="T"){var l=ri(s,o);o=l[1];var el=resolve(a);while(el.childNodes.length>l[0]){el.removeChild(el.lastChild);}}}}
353es.onmessage=function(ev){var d=ev.data;if(/^[0-9]+:/.test(d)){apply(d);}else{document.body.innerHTML=d;}};
354})();</script>"#;
355 JS.replace("__LP__", live_path)
356}
357
358fn urldecode(s: &str) -> String {
363 let b = s.as_bytes();
364 let mut out = Vec::with_capacity(b.len());
365 let mut i = 0;
366 while i < b.len() {
367 if b[i] == b'%' && i + 2 < b.len() {
368 let hi = (b[i + 1] as char).to_digit(16);
369 let lo = (b[i + 2] as char).to_digit(16);
370 if let (Some(h), Some(l)) = (hi, lo) {
371 out.push((h * 16 + l) as u8);
372 i += 3;
373 continue;
374 }
375 }
376 out.push(b[i]);
377 i += 1;
378 }
379 String::from_utf8_lossy(&out).into_owned()
380}
381
382fn query_param(query: &str, key: &str) -> Option<String> {
384 query.split('&').find_map(|kv| {
385 let (k, v) = kv.split_once('=')?;
386 (k == key).then(|| urldecode(v))
387 })
388}
389
390pub fn serve_http_live(
407 wasm: Vec<u8>,
408 handler: &str,
409 subs: Option<&str>,
410 live: Option<&str>,
411 addr: &str,
412 db_path: &str,
413 live_path: &str,
414) -> Result<(), RunError> {
415 let listener =
416 TcpListener::bind(addr).map_err(|e| RunError::Wasmtime(e.to_string()))?;
417 let mut hub = LiveHub::new(wasm, handler, db_path);
418 if let Some(l) = live {
421 hub.set_live_handler(l);
422 }
423 serve_http_live_on(listener, hub, subs, live_path)
424}
425
426fn serve_http_live_on(
430 listener: TcpListener,
431 mut hub: LiveHub,
432 subs: Option<&str>,
433 live_path: &str,
434) -> Result<(), RunError> {
435 let mut parked: HashMap<u64, TcpStream> = HashMap::new();
437
438 for stream in listener.incoming() {
439 let Ok(mut stream) = stream else { continue };
440 let mut reader = BufReader::new(&stream);
441
442 let mut line = String::new();
443 if reader.read_line(&mut line).is_err() {
444 continue;
445 }
446 let mut parts = line.split_whitespace();
447 let method = parts.next().unwrap_or("GET").to_string();
448 let target = parts.next().unwrap_or("/").to_string();
449
450 let mut content_length = 0usize;
451 loop {
452 let mut h = String::new();
453 if reader.read_line(&mut h).is_err() {
454 break;
455 }
456 let h = h.trim_end();
457 if h.is_empty() {
458 break;
459 }
460 if let Some((name, value)) = h.split_once(':') {
461 if name.eq_ignore_ascii_case("content-length") {
462 content_length = value.trim().parse().unwrap_or(0);
463 }
464 }
465 }
466 let mut body = String::new();
467 if content_length > 0 {
468 let mut buf = vec![0u8; content_length];
469 if reader.read_exact(&mut buf).is_ok() {
470 body = String::from_utf8_lossy(&buf).into_owned();
471 }
472 }
473 drop(reader);
474
475 let (path, query) = target.split_once('?').unwrap_or((&target, ""));
476
477 if path == live_path {
479 let page = query_param(query, "path").unwrap_or_else(|| "/".into());
480 let opened = match subs {
481 Some(s) => hub.connect_subscribed(s, "GET", &page, ""),
482 None => hub.connect(Vec::new(), "GET", &page, ""),
483 };
484 let Ok((id, resp)) = opened else { continue };
485 let head = "HTTP/1.1 200 OK\r\n\
492 Content-Type: text/event-stream\r\n\
493 Cache-Control: no-cache\r\n\
494 Connection: keep-alive\r\n\r\n";
495 let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
496 let ok = stream.write_all(head.as_bytes()).is_ok()
497 && (hub.is_diff()
498 || stream
499 .write_all(sse_frame(&resp.body).as_bytes())
500 .is_ok());
501 if ok {
502 parked.insert(id, stream);
503 } else {
504 hub.disconnect(id);
505 }
506 continue;
507 }
508
509 let (status, mut out_body) =
511 match hub.request(&method, &target, &body) {
512 Ok(r) => (r.status, r.body),
513 Err(e) => (500, format!("cairn handler error: {e}")),
514 };
515 if method.eq_ignore_ascii_case("GET") {
519 out_body.push_str(&live_client_script(live_path));
520 }
521 let extra: String = drain_resp_headers()
524 .into_iter()
525 .map(|(n, v)| format!("{n}: {v}\r\n"))
526 .collect();
527 let resp = format!(
528 "HTTP/1.1 {status} {}\r\nContent-Length: {}\r\n\
529 Content-Type: text/html; charset=utf-8\r\n{extra}\r\n{}",
530 reason_phrase(status),
531 out_body.len(),
532 out_body
533 );
534 let _ = stream.write_all(resp.as_bytes());
535 drop(stream);
536
537 let pushes = match hub.drain_and_push() {
539 Ok(p) => p,
540 Err(_) => continue,
541 };
542 for (id, r) in pushes {
543 let frame = sse_frame(&r.body);
544 let dead = match parked.get_mut(&id) {
545 Some(sock) => sock.write_all(frame.as_bytes()).is_err(),
546 None => false,
547 };
548 if dead {
549 parked.remove(&id);
550 hub.disconnect(id);
551 }
552 }
553 }
554 Ok(())
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use crate::edit::{Editor, ModuleSpec};
561 use crate::node::{Param, Produces};
562 use crate::store::Store;
563 use crate::ty::{Confidence, Effect, Type};
564 use crate::web;
565 use crate::wasm::lower;
566 use crate::ExprSpec;
567 use std::collections::BTreeSet;
568
569 fn counter_app_wasm() -> Vec<u8> {
576 let lit = |s: &str| ExprSpec::Str(s.into());
577 let r = |n: &str| ExprSpec::Ref(n.into());
578 let cat = |a: ExprSpec, b: ExprSpec| {
579 ExprSpec::StrConcat(Box::new(a), Box::new(b))
580 };
581 let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
582 base: Box::new(b),
583 type_name: t.into(),
584 field: f.into(),
585 };
586 let dbq = |sql: &str| ExprSpec::DbQuery {
587 sql: Box::new(lit(sql)),
588 params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
589 };
590 let resp = |body: ExprSpec| ExprSpec::Record {
591 type_name: "Response".into(),
592 fields: vec![
593 ("status".into(), ExprSpec::Lit(200)),
594 ("body".into(), body),
595 ],
596 };
597 let create =
598 "CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
599 let mut effs = BTreeSet::new();
600 effs.insert(Effect::Db);
601 effs.insert(Effect::Live);
602
603 let app = crate::FunctionSpec {
604 name: "app".into(),
605 type_params: vec![],
606 params: vec![Param {
607 name: "req".into(),
608 ty: Type::Named("Request".into()),
609 min_confidence: Confidence::External,
610 }],
611 produces: Produces {
612 ty: Type::Named("Response".into()),
613 confidence: Confidence::External,
614 },
615 requires: effs,
616 on_failure: vec![],
617 steps: vec![
618 crate::StepSpec {
619 binding: "path".into(),
620 value: field(r("req"), "Request", "path"),
621 },
622 crate::StepSpec {
623 binding: "c".into(),
624 value: dbq(create),
625 },
626 ],
627 result: ExprSpec::If {
628 cond: Box::new(ExprSpec::StrEq(
629 Box::new(r("path")),
630 Box::new(lit("/add")),
631 )),
632 then_branch: Box::new(resp(cat(
633 cat(
634 lit("ok"),
635 dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
636 ON CONFLICT(id) DO UPDATE SET v=v+1"),
637 ),
638 ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
639 Box::new(lit("items")),
640 ))),
641 ))),
642 else_branch: Box::new(resp(cat(
643 lit("count: "),
644 dbq("SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)"),
645 ))),
646 },
647 };
648
649 let subs = crate::FunctionSpec {
650 name: "subs".into(),
651 type_params: vec![],
652 params: vec![Param {
653 name: "req".into(),
654 ty: Type::Named("Request".into()),
655 min_confidence: Confidence::External,
656 }],
657 produces: Produces {
658 ty: Type::Named("Response".into()),
659 confidence: Confidence::External,
660 },
661 requires: BTreeSet::new(),
662 on_failure: vec![],
663 steps: vec![crate::StepSpec {
664 binding: "path".into(),
665 value: field(r("req"), "Request", "path"),
666 }],
667 result: ExprSpec::If {
668 cond: Box::new(ExprSpec::StrEq(
669 Box::new(r("path")),
670 Box::new(lit("/b")),
671 )),
672 then_branch: Box::new(resp(lit("other"))),
673 else_branch: Box::new(resp(lit("items"))),
674 },
675 };
676
677 let e = Editor::new(Store::open_in_memory().unwrap());
678 let (m, report) = e
679 .apply_module(&ModuleSpec {
680 name: "live".into(),
681 types: web::types(),
682 functions: vec![app, subs],
683 })
684 .unwrap();
685 assert!(report.ok(), "violations: {:?}", report.violations);
686 lower(e.store(), &m).unwrap()
687 }
688
689 #[test]
693 fn publish_refreshes_only_subscribed_connections() {
694 let wasm = counter_app_wasm();
695
696 let mut path = std::env::temp_dir();
697 path.push(format!("cairn-live-{}.db", std::process::id()));
698 let _ = std::fs::remove_file(&path);
699 let dbp = path.to_str().unwrap();
700
701 let mut hub = LiveHub::new(wasm, "app", dbp);
702 let (a, a0) = hub
704 .connect_subscribed("subs", "GET", "/", "")
705 .unwrap();
706 assert!(a0.body.contains("count: 0"), "initial render: {}", a0.body);
707 let (b, _) = hub
708 .connect_subscribed("subs", "GET", "/b", "")
709 .unwrap();
710 assert_eq!(hub.connections(), 2);
711
712 hub.request("POST", "/add", "").unwrap();
714 let pushes = hub.drain_and_push().unwrap();
715
716 assert_eq!(pushes.len(), 1, "only the 'items' subscriber refreshes");
717 assert_eq!(pushes[0].0, a, "connection A (subscribed to items)");
718 assert_ne!(pushes[0].0, b, "connection B must not be pushed");
719 assert!(
720 pushes[0].1.body.contains("count: 1"),
721 "refresh reflects new state: {}",
722 pushes[0].1.body
723 );
724
725 assert!(hub.drain_and_push().unwrap().is_empty());
727
728 hub.disconnect(a);
732 assert_eq!(hub.connections(), 1, "B remains after A is pruned");
733 hub.request("POST", "/add", "").unwrap();
734 assert!(
735 hub.drain_and_push().unwrap().is_empty(),
736 "a disconnected connection is never refreshed"
737 );
738 hub.disconnect(a); assert_eq!(hub.connections(), 1);
740 std::fs::remove_file(&path).unwrap();
741 }
742
743 #[test]
744 fn sse_frame_and_client_shim() {
745 assert_eq!(
747 sse_frame("<div>a</div>\n<p>b</p>"),
748 "data: <div>a</div>\ndata: <p>b</p>\n\n"
749 );
750 assert_eq!(sse_frame(""), "data: \n\n");
751 let s = live_client_script("/__live/");
755 assert!(s.contains("new EventSource(\"/__live/?path=\"+p)"));
756 assert!(s.contains("encodeURIComponent(location.pathname+location.search)"));
757 assert!(s.contains("function apply(s)"));
759 assert!(s.contains("/^[0-9]+:/.test(d)"));
760 assert!(s.contains("resolve(a).textContent=v[0]"));
761 assert!(s.contains("resolve(a).replaceWith"));
762 assert!(s.contains("childNodes[a[i]]"));
763 assert!(s.contains("document.body.innerHTML=d"));
765 assert!(s.starts_with("<script>") && s.ends_with("</script>"));
766 }
767
768 #[test]
774 fn serve_http_live_pushes_a_refresh_over_a_real_socket() {
775 let wasm = counter_app_wasm();
776 let mut path = std::env::temp_dir();
777 path.push(format!("cairn-live-srv-{}.db", std::process::id()));
778 let _ = std::fs::remove_file(&path);
779 let dbp = path.to_str().unwrap().to_string();
780
781 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
782 let port = listener.local_addr().unwrap().port();
783 let hub = LiveHub::new(wasm, "app", &dbp);
784 std::thread::spawn(move || {
788 let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
789 });
790
791 fn wait_for(s: &mut TcpStream, needle: &str) -> String {
793 s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
794 let mut acc = String::new();
795 let mut buf = [0u8; 1024];
796 loop {
797 match s.read(&mut buf) {
798 Ok(0) => break,
799 Ok(n) => {
800 acc.push_str(&String::from_utf8_lossy(&buf[..n]));
801 if acc.contains(needle) {
802 break;
803 }
804 }
805 Err(_) => break,
806 }
807 }
808 acc
809 }
810 let connect = || loop {
811 if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
812 return s;
813 }
814 std::thread::sleep(Duration::from_millis(10));
815 };
816
817 let mut sse = connect();
820 sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
821 .unwrap();
822 let first = wait_for(&mut sse, "count: 0");
823 assert!(first.contains("text/event-stream"), "SSE head: {first}");
824 assert!(first.contains("data: count: 0"), "first frame: {first}");
825
826 let mut post = connect();
828 post.write_all(
829 b"POST /add HTTP/1.1\r\nHost: x\r\nContent-Length: 0\r\n\r\n",
830 )
831 .unwrap();
832 let _ = wait_for(&mut post, "\r\n\r\n");
833
834 let pushed = wait_for(&mut sse, "count: 1");
837 assert!(pushed.contains("data: count: 1"), "no push: {pushed}");
838
839 let mut page = connect();
842 page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
843 let doc = wait_for(&mut page, "</script>");
844 assert!(doc.contains("count: 1"), "page state: {doc}");
845 assert!(
846 doc.contains("new EventSource(\"/__live/?path=\"+p)"),
847 "shim not injected: {doc}"
848 );
849
850 let _ = std::fs::remove_file(&path);
851 }
852
853 #[test]
859 fn serve_http_live_pushes_the_change_wire_over_a_real_socket() {
860 let wasm = tea_live_counter_wasm();
861 let mut path = std::env::temp_dir();
862 path.push(format!("cairn-diffsrv-{}.db", std::process::id()));
863 let _ = std::fs::remove_file(&path);
864 let dbp = path.to_str().unwrap().to_string();
865
866 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
867 let port = listener.local_addr().unwrap().port();
868 let mut hub = LiveHub::new(wasm, "app", &dbp);
869 hub.set_live_handler("app_live");
870 std::thread::spawn(move || {
871 let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
872 });
873
874 fn wait_for(s: &mut TcpStream, needle: &str) -> String {
875 s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
876 let mut acc = String::new();
877 let mut buf = [0u8; 1024];
878 loop {
879 match s.read(&mut buf) {
880 Ok(0) => break,
881 Ok(n) => {
882 acc.push_str(&String::from_utf8_lossy(&buf[..n]));
883 if acc.contains(needle) {
884 break;
885 }
886 }
887 Err(_) => break,
888 }
889 }
890 acc
891 }
892 let connect = || loop {
893 if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
894 return s;
895 }
896 std::thread::sleep(Duration::from_millis(10));
897 };
898
899 let mut page = connect();
901 page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
902 let doc = wait_for(&mut page, "</script>");
903 assert!(doc.contains("<div>count: 0</div>"), "page: {doc}");
904 assert!(
905 doc.contains("new EventSource(\"/__live/?path=\"+p)")
906 && doc.contains("function apply(s)"),
907 "dual-mode shim missing: {doc}"
908 );
909
910 let mut sse = connect();
912 sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
913 .unwrap();
914 let head = wait_for(&mut sse, "\r\n\r\n");
915 assert!(head.contains("text/event-stream"), "SSE head: {head}");
916
917 let mut act = connect();
919 act.write_all(b"GET /inc HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
920 let _ = wait_for(&mut act, "\r\n\r\n");
921 let pushed = wait_for(&mut sse, "count: 1");
922 assert!(
923 pushed.contains("data: 1:S1:0:8:count: 1"),
924 "expected the Change wire, got: {pushed:?}"
925 );
926 assert!(!pushed.contains("<div>"), "must not be full HTML");
927
928 let _ = std::fs::remove_file(&path);
929 }
930
931 fn tea_live_counter_wasm() -> Vec<u8> {
936 let lit = |s: &str| ExprSpec::Str(s.into());
937 let r = |n: &str| ExprSpec::Ref(n.into());
938 let cat = |a: ExprSpec, b: ExprSpec| {
939 ExprSpec::StrConcat(Box::new(a), Box::new(b))
940 };
941 let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
942 base: Box::new(b),
943 type_name: t.into(),
944 field: f.into(),
945 };
946 let dbq = |sql: &str| ExprSpec::DbQuery {
947 sql: Box::new(lit(sql)),
948 params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
949 };
950 let call = |f: &str, a: Vec<ExprSpec>| ExprSpec::Call {
951 func: f.into(),
952 args: a,
953 };
954 let resp = |body: ExprSpec| ExprSpec::Record {
955 type_name: "Response".into(),
956 fields: vec![
957 ("status".into(), ExprSpec::Lit(200)),
958 ("body".into(), body),
959 ],
960 };
961 let create =
962 "CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
963 let mut dbl = BTreeSet::new();
964 dbl.insert(Effect::Db);
965 dbl.insert(Effect::Live);
966 let mut dbo = BTreeSet::new();
967 dbo.insert(Effect::Db);
968
969 let cload = crate::FunctionSpec {
971 name: "cload".into(),
972 type_params: vec![],
973 params: vec![],
974 produces: Produces {
975 ty: Type::Named("Counter".into()),
976 confidence: Confidence::External,
977 },
978 requires: dbo.clone(),
979 on_failure: vec![],
980 steps: vec![crate::StepSpec {
981 binding: "c".into(),
982 value: dbq(create),
983 }],
984 result: ExprSpec::Record {
985 type_name: "Counter".into(),
986 fields: vec![(
987 "value".into(),
988 ExprSpec::StrToNumber(Box::new(dbq(
989 "SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)",
990 ))),
991 )],
992 },
993 };
994 let cview = crate::FunctionSpec {
996 name: "cview".into(),
997 type_params: vec![],
998 params: vec![Param {
999 name: "m".into(),
1000 ty: Type::Named("Counter".into()),
1001 min_confidence: Confidence::External,
1002 }],
1003 produces: Produces {
1004 ty: Type::Named("Element".into()),
1005 confidence: Confidence::External,
1006 },
1007 requires: BTreeSet::new(),
1008 on_failure: vec![],
1009 steps: vec![],
1010 result: ExprSpec::Variant {
1011 type_name: "Element".into(),
1012 case: "El".into(),
1013 fields: vec![
1014 ("tag".into(), lit("div")),
1015 (
1016 "kids".into(),
1017 ExprSpec::List(vec![ExprSpec::Variant {
1018 type_name: "Element".into(),
1019 case: "Text".into(),
1020 fields: vec![(
1021 "content".into(),
1022 cat(
1023 lit("count: "),
1024 ExprSpec::NumberToStr(Box::new(field(
1025 r("m"),
1026 "Counter",
1027 "value",
1028 ))),
1029 ),
1030 )],
1031 }]),
1032 ),
1033 ],
1034 },
1035 };
1036 let app = crate::FunctionSpec {
1038 name: "app".into(),
1039 type_params: vec![],
1040 params: vec![Param {
1041 name: "req".into(),
1042 ty: Type::Named("Request".into()),
1043 min_confidence: Confidence::External,
1044 }],
1045 produces: Produces {
1046 ty: Type::Named("Response".into()),
1047 confidence: Confidence::External,
1048 },
1049 requires: dbl.clone(),
1050 on_failure: vec![],
1051 steps: vec![
1052 crate::StepSpec {
1053 binding: "path".into(),
1054 value: field(r("req"), "Request", "path"),
1055 },
1056 crate::StepSpec {
1057 binding: "c".into(),
1058 value: dbq(create),
1059 },
1060 ],
1061 result: ExprSpec::If {
1062 cond: Box::new(ExprSpec::StrEq(
1063 Box::new(r("path")),
1064 Box::new(lit("/inc")),
1065 )),
1066 then_branch: Box::new(resp(cat(
1067 cat(
1068 lit("ok"),
1069 dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
1070 ON CONFLICT(id) DO UPDATE SET v=v+1"),
1071 ),
1072 ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
1073 Box::new(lit("c")),
1074 ))),
1075 ))),
1076 else_branch: Box::new(resp(call(
1077 "render_html",
1078 vec![call("cview", vec![call("cload", vec![])])],
1079 ))),
1080 },
1081 };
1082 let app_live = crate::FunctionSpec {
1084 name: "app_live".into(),
1085 type_params: vec![],
1086 params: vec![Param {
1087 name: "req".into(),
1088 ty: Type::Named("Request".into()),
1089 min_confidence: Confidence::External,
1090 }],
1091 produces: Produces {
1092 ty: Type::Named("Response".into()),
1093 confidence: Confidence::External,
1094 },
1095 requires: dbo.clone(),
1096 on_failure: vec![],
1097 steps: vec![],
1098 result: call(
1099 "render_live",
1100 vec![
1101 field(r("req"), "Request", "body"),
1102 ExprSpec::FuncRef("cload".into()),
1103 ExprSpec::FuncRef("cview".into()),
1104 ],
1105 ),
1106 };
1107 let subs = crate::FunctionSpec {
1109 name: "subs".into(),
1110 type_params: vec![],
1111 params: vec![Param {
1112 name: "req".into(),
1113 ty: Type::Named("Request".into()),
1114 min_confidence: Confidence::External,
1115 }],
1116 produces: Produces {
1117 ty: Type::Named("Response".into()),
1118 confidence: Confidence::External,
1119 },
1120 requires: BTreeSet::new(),
1121 on_failure: vec![],
1122 steps: vec![],
1123 result: resp(lit("c")),
1124 };
1125
1126 let mut types = web::types();
1127 types.push(crate::edit::TypeDefSpec::Record {
1128 name: "Counter".into(),
1129 fields: vec![("value".into(), Type::Number)],
1130 });
1131 let mut funcs = web::functions();
1132 funcs.extend([cload, cview, app, app_live, subs]);
1133
1134 let e = Editor::new(Store::open_in_memory().unwrap());
1135 let (m, report) = e
1136 .apply_module(&ModuleSpec {
1137 name: "tealive".into(),
1138 types,
1139 functions: funcs,
1140 })
1141 .unwrap();
1142 assert!(report.ok(), "violations: {:?}", report.violations);
1143 lower(e.store(), &m).unwrap()
1144 }
1145
1146 #[test]
1152 fn diff_push_sends_the_change_wire_not_the_full_page() {
1153 let wasm = tea_live_counter_wasm();
1154 let mut path = std::env::temp_dir();
1155 path.push(format!("cairn-diffpush-{}.db", std::process::id()));
1156 let _ = std::fs::remove_file(&path);
1157 let dbp = path.to_str().unwrap();
1158
1159 let mut hub = LiveHub::new(wasm, "app", dbp);
1160 hub.set_live_handler("app_live");
1161
1162 let (id, doc) = hub
1166 .connect_subscribed("subs", "GET", "/", "")
1167 .unwrap();
1168 assert_eq!(
1169 doc.body, "<div>count: 0</div>",
1170 "first render is the HTML page at current (un-mutated) state"
1171 );
1172
1173 hub.request("GET", "/inc", "").unwrap();
1175 let pushes = hub.drain_and_push().unwrap();
1176 assert_eq!(pushes.len(), 1);
1177 assert_eq!(pushes[0].0, id);
1178 assert_eq!(pushes[0].1.body, "1:S1:0:8:count: 1");
1181 assert!(!pushes[0].1.body.contains("<div>"));
1182
1183 hub.request("GET", "/inc", "").unwrap();
1186 let p2 = hub.drain_and_push().unwrap();
1187 assert_eq!(p2.len(), 1);
1188 assert_eq!(p2[0].1.body, "1:S1:0:8:count: 2");
1189
1190 assert!(hub.drain_and_push().unwrap().is_empty());
1192 std::fs::remove_file(&path).unwrap();
1193 }
1194}