use crate::wasm::{
drain_published, drain_resp_headers, reason_phrase, serve_request_db,
HttpResponse, RunError,
};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
struct Conn {
id: u64,
topics: Vec<String>,
method: String,
path: String,
body: String,
prev: String,
}
pub struct LiveHub {
wasm: Vec<u8>,
handler: String,
live_handler: Option<String>,
db_path: String,
next: u64,
conns: Vec<Conn>,
}
fn split_envelope(body: &str) -> (String, String) {
let Some(colon) = body.find(':') else {
return (body.to_string(), String::new());
};
let Ok(n) = body[..colon].parse::<usize>() else {
return (body.to_string(), String::new());
};
let start = colon + 1;
let end = start + n;
if end > body.len() {
return (body.to_string(), String::new());
}
(body[start..end].to_string(), body[end..].to_string())
}
impl LiveHub {
pub fn new(wasm: Vec<u8>, handler: &str, db_path: &str) -> Self {
Self {
wasm,
handler: handler.into(),
live_handler: None,
db_path: db_path.into(),
next: 1,
conns: Vec::new(),
}
}
pub fn set_live_handler(&mut self, name: &str) {
self.live_handler = Some(name.into());
}
pub fn connect(
&mut self,
topics: Vec<String>,
method: &str,
path: &str,
body: &str,
) -> Result<(u64, HttpResponse), RunError> {
let (resp, prev) = if let Some(live) = self.live_handler.clone() {
let env = serve_request_db(
&self.wasm,
&live,
&self.db_path,
method,
path,
"",
)?;
let (html, ser) = split_envelope(&env.body);
(
HttpResponse {
status: env.status,
body: html,
},
ser,
)
} else {
let resp = serve_request_db(
&self.wasm,
&self.handler,
&self.db_path,
method,
path,
body,
)?;
(resp, String::new())
};
let _ = drain_published();
let _ = drain_resp_headers(); let id = self.next;
self.next += 1;
self.conns.push(Conn {
id,
topics,
method: method.into(),
path: path.into(),
body: body.into(),
prev,
});
Ok((id, resp))
}
pub fn connect_subscribed(
&mut self,
subs_handler: &str,
method: &str,
path: &str,
body: &str,
) -> Result<(u64, HttpResponse), RunError> {
let subs = serve_request_db(
&self.wasm,
subs_handler,
&self.db_path,
method,
path,
body,
)?;
let _ = drain_published();
let _ = drain_resp_headers();
let topics: Vec<String> = subs
.body
.split_whitespace()
.map(str::to_string)
.collect();
self.connect(topics, method, path, body)
}
pub fn request(
&self,
method: &str,
path: &str,
body: &str,
) -> Result<HttpResponse, RunError> {
serve_request_db(
&self.wasm,
&self.handler,
&self.db_path,
method,
path,
body,
)
}
pub fn drain_and_push(
&mut self,
) -> Result<Vec<(u64, HttpResponse)>, RunError> {
let topics = drain_published();
if topics.is_empty() {
return Ok(Vec::new());
}
#[allow(clippy::type_complexity)]
let hits: Vec<(usize, u64, String, String, String, String)> = self
.conns
.iter()
.enumerate()
.filter(|(_, c)| c.topics.iter().any(|t| topics.contains(t)))
.map(|(i, c)| {
(
i,
c.id,
c.method.clone(),
c.path.clone(),
c.body.clone(),
c.prev.clone(),
)
})
.collect();
let mut out = Vec::new();
match self.live_handler.clone() {
Some(live) => {
for (i, id, method, path, _body, prev) in hits {
let env = serve_request_db(
&self.wasm,
&live,
&self.db_path,
&method,
&path,
&prev,
)?;
let (changes, ser) = split_envelope(&env.body);
self.conns[i].prev = ser;
out.push((
id,
HttpResponse {
status: env.status,
body: changes,
},
));
}
}
None => {
for (_, id, method, path, body, _prev) in hits {
let resp = serve_request_db(
&self.wasm,
&self.handler,
&self.db_path,
&method,
&path,
&body,
)?;
out.push((id, resp));
}
}
}
let _ = drain_resp_headers();
Ok(out)
}
pub fn disconnect(&mut self, id: u64) {
self.conns.retain(|c| c.id != id);
}
pub fn connections(&self) -> usize {
self.conns.len()
}
pub fn is_diff(&self) -> bool {
self.live_handler.is_some()
}
}
pub fn sse_frame(body: &str) -> String {
let mut out = String::with_capacity(body.len() + 16);
for line in body.split('\n') {
out.push_str("data: ");
out.push_str(line);
out.push('\n');
}
out.push('\n');
out
}
pub fn live_client_script(live_path: &str) -> String {
const JS: &str = r#"<script>(function(){
var p=encodeURIComponent(location.pathname+location.search);
var es=new EventSource("__LP__?path="+p);
function root(){return document.body.firstChild;}
function ri(s,o){var j=s.indexOf(":",o);return [parseInt(s.slice(o,j),10),j+1];}
function rs(s,o){var x=ri(s,o);return [s.slice(x[1],x[1]+x[0]),x[1]+x[0]];}
function rp(s,o){var x=ri(s,o),n=x[0],q=x[1],a=[];for(var i=0;i<n;i++){var y=ri(s,q);a.push(y[0]);q=y[1];}return [a,q];}
function re(s,o){var t=s[o];o++;if(t=="T"){var x=rs(s,o);return [document.createTextNode(x[0]),x[1]];}var g=rs(s,o);o=g[1];var c=ri(s,o),nk=c[0];o=c[1];var e=document.createElement(g[0]);for(var i=0;i<nk;i++){var k=re(s,o);e.appendChild(k[0]);o=k[1];}return [e,o];}
function resolve(a){var c=root();for(var i=a.length-1;i>=0;i--){c=c.childNodes[a[i]];}return c;}
function apply(s){var x=ri(s,0),n=x[0],o=x[1];for(var i=0;i<n;i++){var op=s[o];o++;var pr=rp(s,o),a=pr[0];o=pr[1];if(op=="S"){var v=rs(s,o);o=v[1];resolve(a).textContent=v[0];}else if(op=="R"){var m=re(s,o);o=m[1];resolve(a).replaceWith(m[0]);}else if(op=="A"){var m2=re(s,o);o=m2[1];resolve(a).appendChild(m2[0]);}else if(op=="T"){var l=ri(s,o);o=l[1];var el=resolve(a);while(el.childNodes.length>l[0]){el.removeChild(el.lastChild);}}}}
es.onmessage=function(ev){var d=ev.data;if(/^[0-9]+:/.test(d)){apply(d);}else{document.body.innerHTML=d;}};
})();</script>"#;
JS.replace("__LP__", live_path)
}
fn urldecode(s: &str) -> String {
let b = s.as_bytes();
let mut out = Vec::with_capacity(b.len());
let mut i = 0;
while i < b.len() {
if b[i] == b'%' && i + 2 < b.len() {
let hi = (b[i + 1] as char).to_digit(16);
let lo = (b[i + 2] as char).to_digit(16);
if let (Some(h), Some(l)) = (hi, lo) {
out.push((h * 16 + l) as u8);
i += 3;
continue;
}
}
out.push(b[i]);
i += 1;
}
String::from_utf8_lossy(&out).into_owned()
}
fn query_param(query: &str, key: &str) -> Option<String> {
query.split('&').find_map(|kv| {
let (k, v) = kv.split_once('=')?;
(k == key).then(|| urldecode(v))
})
}
pub fn serve_http_live(
wasm: Vec<u8>,
handler: &str,
subs: Option<&str>,
live: Option<&str>,
addr: &str,
db_path: &str,
live_path: &str,
) -> Result<(), RunError> {
let listener =
TcpListener::bind(addr).map_err(|e| RunError::Wasmtime(e.to_string()))?;
let mut hub = LiveHub::new(wasm, handler, db_path);
if let Some(l) = live {
hub.set_live_handler(l);
}
serve_http_live_on(listener, hub, subs, live_path)
}
fn serve_http_live_on(
listener: TcpListener,
mut hub: LiveHub,
subs: Option<&str>,
live_path: &str,
) -> Result<(), RunError> {
let mut parked: HashMap<u64, TcpStream> = HashMap::new();
for stream in listener.incoming() {
let Ok(mut stream) = stream else { continue };
let mut reader = BufReader::new(&stream);
let mut line = String::new();
if reader.read_line(&mut line).is_err() {
continue;
}
let mut parts = line.split_whitespace();
let method = parts.next().unwrap_or("GET").to_string();
let target = parts.next().unwrap_or("/").to_string();
let mut content_length = 0usize;
loop {
let mut h = String::new();
if reader.read_line(&mut h).is_err() {
break;
}
let h = h.trim_end();
if h.is_empty() {
break;
}
if let Some((name, value)) = h.split_once(':') {
if name.eq_ignore_ascii_case("content-length") {
content_length = value.trim().parse().unwrap_or(0);
}
}
}
let mut body = String::new();
if content_length > 0 {
let mut buf = vec![0u8; content_length];
if reader.read_exact(&mut buf).is_ok() {
body = String::from_utf8_lossy(&buf).into_owned();
}
}
drop(reader);
let (path, query) = target.split_once('?').unwrap_or((&target, ""));
if path == live_path {
let page = query_param(query, "path").unwrap_or_else(|| "/".into());
let opened = match subs {
Some(s) => hub.connect_subscribed(s, "GET", &page, ""),
None => hub.connect(Vec::new(), "GET", &page, ""),
};
let Ok((id, resp)) = opened else { continue };
let head = "HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
Cache-Control: no-cache\r\n\
Connection: keep-alive\r\n\r\n";
let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
let ok = stream.write_all(head.as_bytes()).is_ok()
&& (hub.is_diff()
|| stream
.write_all(sse_frame(&resp.body).as_bytes())
.is_ok());
if ok {
parked.insert(id, stream);
} else {
hub.disconnect(id);
}
continue;
}
let (status, mut out_body) =
match hub.request(&method, &target, &body) {
Ok(r) => (r.status, r.body),
Err(e) => (500, format!("cairn handler error: {e}")),
};
if method.eq_ignore_ascii_case("GET") {
out_body.push_str(&live_client_script(live_path));
}
let extra: String = drain_resp_headers()
.into_iter()
.map(|(n, v)| format!("{n}: {v}\r\n"))
.collect();
let resp = format!(
"HTTP/1.1 {status} {}\r\nContent-Length: {}\r\n\
Content-Type: text/html; charset=utf-8\r\n{extra}\r\n{}",
reason_phrase(status),
out_body.len(),
out_body
);
let _ = stream.write_all(resp.as_bytes());
drop(stream);
let pushes = match hub.drain_and_push() {
Ok(p) => p,
Err(_) => continue,
};
for (id, r) in pushes {
let frame = sse_frame(&r.body);
let dead = match parked.get_mut(&id) {
Some(sock) => sock.write_all(frame.as_bytes()).is_err(),
None => false,
};
if dead {
parked.remove(&id);
hub.disconnect(id);
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::edit::{Editor, ModuleSpec};
use crate::node::{Param, Produces};
use crate::store::Store;
use crate::ty::{Confidence, Effect, Type};
use crate::web;
use crate::wasm::lower;
use crate::ExprSpec;
use std::collections::BTreeSet;
fn counter_app_wasm() -> Vec<u8> {
let lit = |s: &str| ExprSpec::Str(s.into());
let r = |n: &str| ExprSpec::Ref(n.into());
let cat = |a: ExprSpec, b: ExprSpec| {
ExprSpec::StrConcat(Box::new(a), Box::new(b))
};
let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
base: Box::new(b),
type_name: t.into(),
field: f.into(),
};
let dbq = |sql: &str| ExprSpec::DbQuery {
sql: Box::new(lit(sql)),
params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
};
let resp = |body: ExprSpec| ExprSpec::Record {
type_name: "Response".into(),
fields: vec![
("status".into(), ExprSpec::Lit(200)),
("body".into(), body),
],
};
let create =
"CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
let mut effs = BTreeSet::new();
effs.insert(Effect::Db);
effs.insert(Effect::Live);
let app = crate::FunctionSpec {
name: "app".into(),
type_params: vec![],
params: vec![Param {
name: "req".into(),
ty: Type::Named("Request".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Response".into()),
confidence: Confidence::External,
},
requires: effs,
on_failure: vec![],
steps: vec![
crate::StepSpec {
binding: "path".into(),
value: field(r("req"), "Request", "path"),
},
crate::StepSpec {
binding: "c".into(),
value: dbq(create),
},
],
result: ExprSpec::If {
cond: Box::new(ExprSpec::StrEq(
Box::new(r("path")),
Box::new(lit("/add")),
)),
then_branch: Box::new(resp(cat(
cat(
lit("ok"),
dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
ON CONFLICT(id) DO UPDATE SET v=v+1"),
),
ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
Box::new(lit("items")),
))),
))),
else_branch: Box::new(resp(cat(
lit("count: "),
dbq("SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)"),
))),
},
};
let subs = crate::FunctionSpec {
name: "subs".into(),
type_params: vec![],
params: vec![Param {
name: "req".into(),
ty: Type::Named("Request".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Response".into()),
confidence: Confidence::External,
},
requires: BTreeSet::new(),
on_failure: vec![],
steps: vec![crate::StepSpec {
binding: "path".into(),
value: field(r("req"), "Request", "path"),
}],
result: ExprSpec::If {
cond: Box::new(ExprSpec::StrEq(
Box::new(r("path")),
Box::new(lit("/b")),
)),
then_branch: Box::new(resp(lit("other"))),
else_branch: Box::new(resp(lit("items"))),
},
};
let e = Editor::new(Store::open_in_memory().unwrap());
let (m, report) = e
.apply_module(&ModuleSpec {
name: "live".into(),
types: web::types(),
functions: vec![app, subs],
})
.unwrap();
assert!(report.ok(), "violations: {:?}", report.violations);
lower(e.store(), &m).unwrap()
}
#[test]
fn publish_refreshes_only_subscribed_connections() {
let wasm = counter_app_wasm();
let mut path = std::env::temp_dir();
path.push(format!("cairn-live-{}.db", std::process::id()));
let _ = std::fs::remove_file(&path);
let dbp = path.to_str().unwrap();
let mut hub = LiveHub::new(wasm, "app", dbp);
let (a, a0) = hub
.connect_subscribed("subs", "GET", "/", "")
.unwrap();
assert!(a0.body.contains("count: 0"), "initial render: {}", a0.body);
let (b, _) = hub
.connect_subscribed("subs", "GET", "/b", "")
.unwrap();
assert_eq!(hub.connections(), 2);
hub.request("POST", "/add", "").unwrap();
let pushes = hub.drain_and_push().unwrap();
assert_eq!(pushes.len(), 1, "only the 'items' subscriber refreshes");
assert_eq!(pushes[0].0, a, "connection A (subscribed to items)");
assert_ne!(pushes[0].0, b, "connection B must not be pushed");
assert!(
pushes[0].1.body.contains("count: 1"),
"refresh reflects new state: {}",
pushes[0].1.body
);
assert!(hub.drain_and_push().unwrap().is_empty());
hub.disconnect(a);
assert_eq!(hub.connections(), 1, "B remains after A is pruned");
hub.request("POST", "/add", "").unwrap();
assert!(
hub.drain_and_push().unwrap().is_empty(),
"a disconnected connection is never refreshed"
);
hub.disconnect(a); assert_eq!(hub.connections(), 1);
std::fs::remove_file(&path).unwrap();
}
#[test]
fn sse_frame_and_client_shim() {
assert_eq!(
sse_frame("<div>a</div>\n<p>b</p>"),
"data: <div>a</div>\ndata: <p>b</p>\n\n"
);
assert_eq!(sse_frame(""), "data: \n\n");
let s = live_client_script("/__live/");
assert!(s.contains("new EventSource(\"/__live/?path=\"+p)"));
assert!(s.contains("encodeURIComponent(location.pathname+location.search)"));
assert!(s.contains("function apply(s)"));
assert!(s.contains("/^[0-9]+:/.test(d)"));
assert!(s.contains("resolve(a).textContent=v[0]"));
assert!(s.contains("resolve(a).replaceWith"));
assert!(s.contains("childNodes[a[i]]"));
assert!(s.contains("document.body.innerHTML=d"));
assert!(s.starts_with("<script>") && s.ends_with("</script>"));
}
#[test]
fn serve_http_live_pushes_a_refresh_over_a_real_socket() {
let wasm = counter_app_wasm();
let mut path = std::env::temp_dir();
path.push(format!("cairn-live-srv-{}.db", std::process::id()));
let _ = std::fs::remove_file(&path);
let dbp = path.to_str().unwrap().to_string();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let hub = LiveHub::new(wasm, "app", &dbp);
std::thread::spawn(move || {
let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
});
fn wait_for(s: &mut TcpStream, needle: &str) -> String {
s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
let mut acc = String::new();
let mut buf = [0u8; 1024];
loop {
match s.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
acc.push_str(&String::from_utf8_lossy(&buf[..n]));
if acc.contains(needle) {
break;
}
}
Err(_) => break,
}
}
acc
}
let connect = || loop {
if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
return s;
}
std::thread::sleep(Duration::from_millis(10));
};
let mut sse = connect();
sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
.unwrap();
let first = wait_for(&mut sse, "count: 0");
assert!(first.contains("text/event-stream"), "SSE head: {first}");
assert!(first.contains("data: count: 0"), "first frame: {first}");
let mut post = connect();
post.write_all(
b"POST /add HTTP/1.1\r\nHost: x\r\nContent-Length: 0\r\n\r\n",
)
.unwrap();
let _ = wait_for(&mut post, "\r\n\r\n");
let pushed = wait_for(&mut sse, "count: 1");
assert!(pushed.contains("data: count: 1"), "no push: {pushed}");
let mut page = connect();
page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
let doc = wait_for(&mut page, "</script>");
assert!(doc.contains("count: 1"), "page state: {doc}");
assert!(
doc.contains("new EventSource(\"/__live/?path=\"+p)"),
"shim not injected: {doc}"
);
let _ = std::fs::remove_file(&path);
}
#[test]
fn serve_http_live_pushes_the_change_wire_over_a_real_socket() {
let wasm = tea_live_counter_wasm();
let mut path = std::env::temp_dir();
path.push(format!("cairn-diffsrv-{}.db", std::process::id()));
let _ = std::fs::remove_file(&path);
let dbp = path.to_str().unwrap().to_string();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let mut hub = LiveHub::new(wasm, "app", &dbp);
hub.set_live_handler("app_live");
std::thread::spawn(move || {
let _ = serve_http_live_on(listener, hub, Some("subs"), "/__live/");
});
fn wait_for(s: &mut TcpStream, needle: &str) -> String {
s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
let mut acc = String::new();
let mut buf = [0u8; 1024];
loop {
match s.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
acc.push_str(&String::from_utf8_lossy(&buf[..n]));
if acc.contains(needle) {
break;
}
}
Err(_) => break,
}
}
acc
}
let connect = || loop {
if let Ok(s) = TcpStream::connect(("127.0.0.1", port)) {
return s;
}
std::thread::sleep(Duration::from_millis(10));
};
let mut page = connect();
page.write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
let doc = wait_for(&mut page, "</script>");
assert!(doc.contains("<div>count: 0</div>"), "page: {doc}");
assert!(
doc.contains("new EventSource(\"/__live/?path=\"+p)")
&& doc.contains("function apply(s)"),
"dual-mode shim missing: {doc}"
);
let mut sse = connect();
sse.write_all(b"GET /__live/?path=%2F HTTP/1.1\r\nHost: x\r\n\r\n")
.unwrap();
let head = wait_for(&mut sse, "\r\n\r\n");
assert!(head.contains("text/event-stream"), "SSE head: {head}");
let mut act = connect();
act.write_all(b"GET /inc HTTP/1.1\r\nHost: x\r\n\r\n").unwrap();
let _ = wait_for(&mut act, "\r\n\r\n");
let pushed = wait_for(&mut sse, "count: 1");
assert!(
pushed.contains("data: 1:S1:0:8:count: 1"),
"expected the Change wire, got: {pushed:?}"
);
assert!(!pushed.contains("<div>"), "must not be full HTML");
let _ = std::fs::remove_file(&path);
}
fn tea_live_counter_wasm() -> Vec<u8> {
let lit = |s: &str| ExprSpec::Str(s.into());
let r = |n: &str| ExprSpec::Ref(n.into());
let cat = |a: ExprSpec, b: ExprSpec| {
ExprSpec::StrConcat(Box::new(a), Box::new(b))
};
let field = |b: ExprSpec, t: &str, f: &str| ExprSpec::Field {
base: Box::new(b),
type_name: t.into(),
field: f.into(),
};
let dbq = |sql: &str| ExprSpec::DbQuery {
sql: Box::new(lit(sql)),
params: Box::new(ExprSpec::ListEmpty { elem: Type::String }),
};
let call = |f: &str, a: Vec<ExprSpec>| ExprSpec::Call {
func: f.into(),
args: a,
};
let resp = |body: ExprSpec| ExprSpec::Record {
type_name: "Response".into(),
fields: vec![
("status".into(), ExprSpec::Lit(200)),
("body".into(), body),
],
};
let create =
"CREATE TABLE IF NOT EXISTS ctr(id INTEGER PRIMARY KEY, v INTEGER)";
let mut dbl = BTreeSet::new();
dbl.insert(Effect::Db);
dbl.insert(Effect::Live);
let mut dbo = BTreeSet::new();
dbo.insert(Effect::Db);
let cload = crate::FunctionSpec {
name: "cload".into(),
type_params: vec![],
params: vec![],
produces: Produces {
ty: Type::Named("Counter".into()),
confidence: Confidence::External,
},
requires: dbo.clone(),
on_failure: vec![],
steps: vec![crate::StepSpec {
binding: "c".into(),
value: dbq(create),
}],
result: ExprSpec::Record {
type_name: "Counter".into(),
fields: vec![(
"value".into(),
ExprSpec::StrToNumber(Box::new(dbq(
"SELECT COALESCE((SELECT v FROM ctr WHERE id=1),0)",
))),
)],
},
};
let cview = crate::FunctionSpec {
name: "cview".into(),
type_params: vec![],
params: vec![Param {
name: "m".into(),
ty: Type::Named("Counter".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Element".into()),
confidence: Confidence::External,
},
requires: BTreeSet::new(),
on_failure: vec![],
steps: vec![],
result: ExprSpec::Variant {
type_name: "Element".into(),
case: "El".into(),
fields: vec![
("tag".into(), lit("div")),
(
"kids".into(),
ExprSpec::List(vec![ExprSpec::Variant {
type_name: "Element".into(),
case: "Text".into(),
fields: vec![(
"content".into(),
cat(
lit("count: "),
ExprSpec::NumberToStr(Box::new(field(
r("m"),
"Counter",
"value",
))),
),
)],
}]),
),
],
},
};
let app = crate::FunctionSpec {
name: "app".into(),
type_params: vec![],
params: vec![Param {
name: "req".into(),
ty: Type::Named("Request".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Response".into()),
confidence: Confidence::External,
},
requires: dbl.clone(),
on_failure: vec![],
steps: vec![
crate::StepSpec {
binding: "path".into(),
value: field(r("req"), "Request", "path"),
},
crate::StepSpec {
binding: "c".into(),
value: dbq(create),
},
],
result: ExprSpec::If {
cond: Box::new(ExprSpec::StrEq(
Box::new(r("path")),
Box::new(lit("/inc")),
)),
then_branch: Box::new(resp(cat(
cat(
lit("ok"),
dbq("INSERT INTO ctr(id,v) VALUES(1,1) \
ON CONFLICT(id) DO UPDATE SET v=v+1"),
),
ExprSpec::NumberToStr(Box::new(ExprSpec::Publish(
Box::new(lit("c")),
))),
))),
else_branch: Box::new(resp(call(
"render_html",
vec![call("cview", vec![call("cload", vec![])])],
))),
},
};
let app_live = crate::FunctionSpec {
name: "app_live".into(),
type_params: vec![],
params: vec![Param {
name: "req".into(),
ty: Type::Named("Request".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Response".into()),
confidence: Confidence::External,
},
requires: dbo.clone(),
on_failure: vec![],
steps: vec![],
result: call(
"render_live",
vec![
field(r("req"), "Request", "body"),
ExprSpec::FuncRef("cload".into()),
ExprSpec::FuncRef("cview".into()),
],
),
};
let subs = crate::FunctionSpec {
name: "subs".into(),
type_params: vec![],
params: vec![Param {
name: "req".into(),
ty: Type::Named("Request".into()),
min_confidence: Confidence::External,
}],
produces: Produces {
ty: Type::Named("Response".into()),
confidence: Confidence::External,
},
requires: BTreeSet::new(),
on_failure: vec![],
steps: vec![],
result: resp(lit("c")),
};
let mut types = web::types();
types.push(crate::edit::TypeDefSpec::Record {
name: "Counter".into(),
fields: vec![("value".into(), Type::Number)],
});
let mut funcs = web::functions();
funcs.extend([cload, cview, app, app_live, subs]);
let e = Editor::new(Store::open_in_memory().unwrap());
let (m, report) = e
.apply_module(&ModuleSpec {
name: "tealive".into(),
types,
functions: funcs,
})
.unwrap();
assert!(report.ok(), "violations: {:?}", report.violations);
lower(e.store(), &m).unwrap()
}
#[test]
fn diff_push_sends_the_change_wire_not_the_full_page() {
let wasm = tea_live_counter_wasm();
let mut path = std::env::temp_dir();
path.push(format!("cairn-diffpush-{}.db", std::process::id()));
let _ = std::fs::remove_file(&path);
let dbp = path.to_str().unwrap();
let mut hub = LiveHub::new(wasm, "app", dbp);
hub.set_live_handler("app_live");
let (id, doc) = hub
.connect_subscribed("subs", "GET", "/", "")
.unwrap();
assert_eq!(
doc.body, "<div>count: 0</div>",
"first render is the HTML page at current (un-mutated) state"
);
hub.request("GET", "/inc", "").unwrap();
let pushes = hub.drain_and_push().unwrap();
assert_eq!(pushes.len(), 1);
assert_eq!(pushes[0].0, id);
assert_eq!(pushes[0].1.body, "1:S1:0:8:count: 1");
assert!(!pushes[0].1.body.contains("<div>"));
hub.request("GET", "/inc", "").unwrap();
let p2 = hub.drain_and_push().unwrap();
assert_eq!(p2.len(), 1);
assert_eq!(p2[0].1.body, "1:S1:0:8:count: 2");
assert!(hub.drain_and_push().unwrap().is_empty());
std::fs::remove_file(&path).unwrap();
}
}