Skip to main content

sim_lib_web_bridge/
sync.rs

1//! Multi-surface synchronized edit sessions: broadcast, handoff, and replay.
2//!
3//! One resource can be open in MANY surfaces at once. The [`SurfaceHub`] owns
4//! the single CANONICAL value for every resource and is the one coordination
5//! point: a committed edit on any surface is applied to the canonical store and
6//! then BROADCAST -- as a Scene plus a Scene diff -- to every surface/pane
7//! viewing that resource, including the surface that issued the edit. This
8//! avoids trying to make N independent transports share events; the hub is the
9//! shared state.
10//!
11//! Edits flow through the universal default lens: an Intent is proposed and
12//! committed through `edit:default`, yielding the universal `{op: set-value,
13//! value: <proposed>}` operation, which is applied to the canonical store and
14//! recorded in an append-only [`EditRow`] ledger carrying the issuing
15//! operator and logical tick. Two surfaces editing the same resource therefore
16//! apply in submit order (last write wins), and the ledger is replayable:
17//! [`replay`] re-applies it to a seed state and reproduces the same canonical
18//! state, proving the edit log is auditable.
19//!
20//! Handoff ([`SurfaceHub::handoff`]) opens an already-held resource on a second
21//! surface so subsequent edits broadcast to both.
22
23use std::collections::BTreeMap;
24use std::sync::Arc;
25
26use sim_kernel::{Cx, DefaultFactory, EagerPolicy, Error, Expr, Result, Symbol};
27use sim_lib_view::codec::reduce_for_caps;
28use sim_lib_view::{
29    LensRegistry, SurfaceCaps, UNIVERSAL_EDITOR_ID, UNIVERSAL_VIEW_ID, register_universal_default,
30};
31
32/// One re-rendered Scene pushed to a surface/pane after a canonical edit.
33///
34/// `diff` is the Scene patch from the pane's previously cached Scene to
35/// `scene`; applying it with [`sim_lib_scene::apply`] reconstructs `scene`.
36#[derive(Clone, Debug)]
37pub struct Broadcast {
38    /// The surface that receives this update.
39    pub surface: Symbol,
40    /// The pane on that surface.
41    pub pane: Symbol,
42    /// The full new Scene for the pane.
43    pub scene: Expr,
44    /// The Scene patch from the pane's prior Scene to `scene`.
45    pub diff: Expr,
46}
47
48/// One append-only ledger row: a committed edit, attributed and replayable.
49///
50/// Rows are appended in submit order. Replaying them in order through
51/// [`replay`] reproduces the final canonical state.
52#[derive(Clone, Debug)]
53pub struct EditRow {
54    /// The resource that was edited.
55    pub resource: Symbol,
56    /// The issuing operator (from the Intent origin, e.g. `human`/`agent`).
57    pub operator: Symbol,
58    /// The issuing logical tick (from the Intent origin `at-tick`).
59    pub tick: u64,
60    /// The committed `{op: set-value, value: <proposed>}` operation.
61    pub operation: Expr,
62}
63
64/// A live binding of a `(surface, pane)` to a resource, with the last Scene
65/// shown there so the next broadcast can be diffed against it.
66struct Binding {
67    surface: Symbol,
68    pane: Symbol,
69    resource: Symbol,
70    last_scene: Expr,
71}
72
73/// The canonical multi-surface coordination point.
74///
75/// Holds the single canonical value per resource, the universal [`LensRegistry`]
76/// used to render and edit, an owned [`Cx`], the registered surfaces and their
77/// [`SurfaceCaps`], the live `(surface, pane)` bindings, and the append-only
78/// [`EditRow`] ledger.
79pub struct SurfaceHub {
80    canonical: BTreeMap<Symbol, Expr>,
81    registry: LensRegistry,
82    cx: Cx,
83    surfaces: BTreeMap<Symbol, SurfaceCaps>,
84    bindings: Vec<Binding>,
85    ledger: Vec<EditRow>,
86}
87
88impl Default for SurfaceHub {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl SurfaceHub {
95    /// A new hub with the universal default lens registered (writable) and no
96    /// resources, surfaces, bindings, or ledger rows.
97    pub fn new() -> Self {
98        let mut registry = LensRegistry::new();
99        register_universal_default(&mut registry, false);
100        Self {
101            canonical: BTreeMap::new(),
102            registry,
103            cx: Cx::new(Arc::new(EagerPolicy), Arc::new(DefaultFactory)),
104            surfaces: BTreeMap::new(),
105            bindings: Vec::new(),
106            ledger: Vec::new(),
107        }
108    }
109
110    /// Set (or replace) the canonical value of `resource`.
111    pub fn seed(&mut self, resource: Symbol, value: Expr) {
112        self.canonical.insert(resource, value);
113    }
114
115    /// Register a surface (identified by `surface`) with its capabilities.
116    /// Re-registering replaces the stored caps.
117    pub fn register_surface(&mut self, surface: Symbol, caps: SurfaceCaps) {
118        self.surfaces.insert(surface, caps);
119    }
120
121    /// Bind `(surface, pane)` to `resource`, render the canonical value through
122    /// the universal view (projected to the surface caps via
123    /// [`reduce_for_caps`]), cache that Scene for the pane, and return it.
124    ///
125    /// An existing binding for the same `(surface, pane)` is replaced. Fails if
126    /// the surface is not registered or the resource has no canonical value.
127    pub fn open(&mut self, surface: &Symbol, pane: Symbol, resource: Symbol) -> Result<Expr> {
128        let caps = self.caps_of(surface)?;
129        let value = self.value_of(&resource)?;
130        let scene = render_for_surface(&mut self.cx, &self.registry, &caps, &value)?;
131        self.bindings
132            .retain(|binding| !(binding.surface == *surface && binding.pane == pane));
133        self.bindings.push(Binding {
134            surface: surface.clone(),
135            pane,
136            resource,
137            last_scene: scene.clone(),
138        });
139        Ok(scene)
140    }
141
142    /// Submit an Intent against the resource shown in `(surface, pane)`.
143    ///
144    /// The Intent is proposed and committed through the universal editor against
145    /// the CURRENT canonical value; the resulting `set-value` operation is
146    /// applied to the canonical store and appended to the ledger (attributed to
147    /// the Intent origin's operator and tick). Then, for EVERY `(surface, pane)`
148    /// viewing that resource -- including other surfaces -- the new value is
149    /// re-rendered, diffed against the pane's cached Scene, the cache updated,
150    /// and a [`Broadcast`] emitted. Returns all broadcasts.
151    ///
152    /// Fails closed (returns an error, never panics) if the pane is not open,
153    /// the resource is missing, the Intent is invalid, or the draft is not
154    /// committable.
155    pub fn submit(
156        &mut self,
157        surface: &Symbol,
158        pane: &Symbol,
159        intent: &Expr,
160    ) -> Result<Vec<Broadcast>> {
161        let resource = self
162            .bindings
163            .iter()
164            .find(|binding| binding.surface == *surface && binding.pane == *pane)
165            .map(|binding| binding.resource.clone())
166            .ok_or_else(|| Error::HostError(format!("({surface}, {pane}) is not open")))?;
167        let value = self.value_of(&resource)?;
168
169        let editor = Symbol::new(UNIVERSAL_EDITOR_ID);
170        let draft = self
171            .registry
172            .propose(&mut self.cx, &editor, &value, intent)?;
173        let operation = self.registry.commit(&mut self.cx, &editor, &draft)?;
174        let new_value = apply_set_value(&operation.form)?;
175
176        // Render EVERY per-surface broadcast into a staging buffer FIRST. A
177        // render (or a surface that lost its capabilities) can fail mid-iteration;
178        // if it does we must mutate nothing -- otherwise canonical/ledger move
179        // forward while some caches advance and no broadcast is delivered, an
180        // unrecoverable replay divergence. We commit only after all succeed.
181        let mut staged: Vec<(usize, Broadcast)> = Vec::new();
182        {
183            let Self {
184                registry,
185                cx,
186                surfaces,
187                bindings,
188                ..
189            } = self;
190            for (index, binding) in bindings.iter().enumerate() {
191                if binding.resource != resource {
192                    continue;
193                }
194                let caps = surfaces.get(&binding.surface).ok_or_else(|| {
195                    Error::HostError(format!(
196                        "surface '{}' lost its capabilities",
197                        binding.surface
198                    ))
199                })?;
200                let scene = render_for_surface(cx, registry, caps, &new_value)?;
201                let diff = sim_lib_scene::diff(&binding.last_scene, &scene);
202                staged.push((
203                    index,
204                    Broadcast {
205                        surface: binding.surface.clone(),
206                        pane: binding.pane.clone(),
207                        scene,
208                        diff,
209                    },
210                ));
211            }
212        }
213
214        // All broadcasts rendered: commit atomically -- canonical, then ledger,
215        // then swap in each surface's advanced last_scene cache.
216        self.canonical.insert(resource.clone(), new_value);
217        let (operator, tick) = origin_of(intent);
218        self.ledger.push(EditRow {
219            resource,
220            operator,
221            tick,
222            operation: operation.form,
223        });
224        let mut broadcasts = Vec::with_capacity(staged.len());
225        for (index, broadcast) in staged {
226            self.bindings[index].last_scene = broadcast.scene.clone();
227            broadcasts.push(broadcast);
228        }
229        Ok(broadcasts)
230    }
231
232    /// Hand `resource` off from `from` to `to`: open it on `to` in a new `pane`
233    /// and return its Scene. The `from` surface keeps its binding, so the
234    /// resource is now open on both and subsequent edits broadcast to both.
235    ///
236    /// Fails if `from` does not currently hold `resource`.
237    pub fn handoff(
238        &mut self,
239        from: &Symbol,
240        to: &Symbol,
241        resource: Symbol,
242        pane: Symbol,
243    ) -> Result<Expr> {
244        let held = self
245            .bindings
246            .iter()
247            .any(|binding| binding.surface == *from && binding.resource == resource);
248        if !held {
249            return Err(Error::HostError(format!(
250                "surface '{from}' does not hold resource '{resource}' to hand off"
251            )));
252        }
253        self.open(to, pane, resource)
254    }
255
256    /// The append-only edit ledger, in submit order.
257    pub fn ledger(&self) -> &[EditRow] {
258        &self.ledger
259    }
260
261    /// The current canonical value of `resource`, if any.
262    pub fn canonical(&self, resource: &Symbol) -> Option<&Expr> {
263        self.canonical.get(resource)
264    }
265
266    fn caps_of(&self, surface: &Symbol) -> Result<SurfaceCaps> {
267        self.surfaces
268            .get(surface)
269            .cloned()
270            .ok_or_else(|| Error::HostError(format!("surface '{surface}' is not registered")))
271    }
272
273    fn value_of(&self, resource: &Symbol) -> Result<Expr> {
274        self.canonical.get(resource).cloned().ok_or_else(|| {
275            Error::HostError(format!("resource '{resource}' has no canonical value"))
276        })
277    }
278}
279
280/// Re-apply a ledger to a seed canonical state, yielding the final state.
281///
282/// Rows are applied in order; for a resource, the last `set-value` wins. This is
283/// the replay surface that proves the ledger is auditable: feeding the rows
284/// produced by a run of edits back over the original seed reproduces the final
285/// canonical state of the hub.
286///
287/// Every committed row carries the universal `{op: set-value, ...}` operation,
288/// so replay fails closed if a row's operation is not a `set-value`: a foreign
289/// or corrupted ledger row is surfaced as an error rather than silently dropped,
290/// which would otherwise reproduce a state that never existed.
291pub fn replay(rows: &[EditRow], seed: BTreeMap<Symbol, Expr>) -> Result<BTreeMap<Symbol, Expr>> {
292    let mut state = seed;
293    for row in rows {
294        let value = apply_set_value(&row.operation)?;
295        state.insert(row.resource.clone(), value);
296    }
297    Ok(state)
298}
299
300/// Render `value` through the universal view, projected to `caps`.
301fn render_for_surface(
302    cx: &mut Cx,
303    registry: &LensRegistry,
304    caps: &SurfaceCaps,
305    value: &Expr,
306) -> Result<Expr> {
307    let scene = registry.render(cx, &Symbol::new(UNIVERSAL_VIEW_ID), value)?;
308    Ok(reduce_for_caps(&scene, caps))
309}
310
311/// Interpret the universal `{op: set-value, value: <v>}` operation, returning
312/// `<v>`. Any other shape fails closed.
313fn apply_set_value(operation: &Expr) -> Result<Expr> {
314    let Expr::Map(entries) = operation else {
315        return Err(Error::HostError("operation is not a map".to_owned()));
316    };
317    let is_set_value = entries.iter().any(|(key, value)| {
318        matches!(key, Expr::Symbol(symbol) if &*symbol.name == "op")
319            && matches!(value, Expr::Symbol(symbol) if &*symbol.name == "set-value")
320    });
321    if !is_set_value {
322        return Err(Error::HostError(
323            "operation is not a set-value op".to_owned(),
324        ));
325    }
326    entries
327        .iter()
328        .find_map(|(key, value)| {
329            matches!(key, Expr::Symbol(symbol) if &*symbol.name == "value").then_some(value)
330        })
331        .cloned()
332        .ok_or_else(|| Error::HostError("set-value operation is missing a 'value'".to_owned()))
333}
334
335/// Read the operator symbol and logical tick from an Intent origin, defaulting
336/// to `unknown`/`0` if absent (the Intent is validated before this is called).
337fn origin_of(intent: &Expr) -> (Symbol, u64) {
338    let origin = sim_value::access::field(intent, "origin");
339    let operator = origin
340        .and_then(|origin| sim_value::access::field_sym(origin, "operator"))
341        .unwrap_or_else(|| Symbol::new("unknown"));
342    let tick = origin
343        .and_then(|origin| sim_value::access::field_i64(origin, "at-tick"))
344        .unwrap_or(0)
345        .max(0) as u64;
346    (operator, tick)
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    use sim_kernel::NumberLiteral;
354    use sim_lib_intent::{Origin, intent};
355    use sim_lib_view::surface;
356
357    use sim_value::build::keyword as sym;
358
359    fn number(value: &str) -> Expr {
360        Expr::Number(NumberLiteral {
361            domain: sym("i64"),
362            canonical: value.to_owned(),
363        })
364    }
365
366    fn doc() -> Expr {
367        Expr::Map(vec![
368            (Expr::Symbol(sym("a")), number("1")),
369            (Expr::Symbol(sym("b")), number("2")),
370        ])
371    }
372
373    /// An `edit-field` Intent setting top-level field `field` to `value`.
374    fn edit(operator: Origin, field: &str, value: Expr) -> Expr {
375        intent(
376            "edit-field",
377            operator,
378            vec![
379                ("target", doc()),
380                (
381                    "path",
382                    Expr::List(vec![Expr::Vector(vec![
383                        Expr::Symbol(sym("k")),
384                        Expr::Symbol(sym(field)),
385                    ])]),
386                ),
387                ("value", value),
388            ],
389        )
390    }
391
392    fn hub_with_surfaces() -> SurfaceHub {
393        let mut hub = SurfaceHub::new();
394        hub.register_surface(sym("cli"), surface::preset("cli").unwrap());
395        hub.register_surface(sym("web"), surface::preset("webui").unwrap());
396        hub.register_surface(sym("watch"), surface::preset("watch").unwrap());
397        hub
398    }
399
400    fn field(map: &Expr, name: &str) -> Option<Expr> {
401        let Expr::Map(entries) = map else {
402            return None;
403        };
404        entries.iter().find_map(|(key, value)| {
405            matches!(key, Expr::Symbol(symbol) if &*symbol.name == name).then(|| value.clone())
406        })
407    }
408
409    #[test]
410    fn an_edit_broadcasts_to_every_surface_viewing_the_resource() {
411        let mut hub = hub_with_surfaces();
412        hub.seed(sym("doc"), doc());
413        let cli_scene = hub.open(&sym("cli"), sym("pane"), sym("doc")).unwrap();
414        let web_scene = hub.open(&sym("web"), sym("pane"), sym("doc")).unwrap();
415
416        let broadcasts = hub
417            .submit(
418                &sym("cli"),
419                &sym("pane"),
420                &edit(Origin::human(1), "a", number("9")),
421            )
422            .unwrap();
423
424        // Both surfaces viewing `doc` receive a broadcast.
425        assert!(broadcasts.len() >= 2);
426        assert!(broadcasts.iter().any(|b| b.surface == sym("cli")));
427        assert!(broadcasts.iter().any(|b| b.surface == sym("web")));
428
429        // Each diff reconstructs the surface's new Scene from its prior Scene.
430        for broadcast in &broadcasts {
431            let prior = if broadcast.surface == sym("cli") {
432                &cli_scene
433            } else {
434                &web_scene
435            };
436            let rebuilt = sim_lib_scene::apply(prior, &broadcast.diff).unwrap();
437            assert_eq!(rebuilt, broadcast.scene);
438        }
439
440        // The canonical value changed.
441        let canonical = hub.canonical(&sym("doc")).unwrap();
442        assert_eq!(field(canonical, "a"), Some(number("9")));
443        assert_eq!(field(canonical, "b"), Some(number("2")));
444    }
445
446    #[test]
447    fn a_mid_loop_broadcast_error_leaves_canonical_ledger_and_caches_unchanged() {
448        let mut hub = hub_with_surfaces();
449        hub.seed(sym("doc"), doc());
450        // cli is bound first, web second, both on `doc`.
451        let cli_scene = hub.open(&sym("cli"), sym("pane"), sym("doc")).unwrap();
452        hub.open(&sym("web"), sym("pane"), sym("doc")).unwrap();
453
454        let canonical_before = hub.canonical(&sym("doc")).cloned();
455        let ledger_len_before = hub.ledger().len();
456
457        // Simulate the web surface dropping its capabilities while its binding
458        // remains -- the desync the atomic submit must tolerate without
459        // corrupting shared state.
460        hub.surfaces.remove(&sym("web"));
461
462        // Submitting from cli stages cli's broadcast, then hits web's missing
463        // caps mid-loop. The submit must fail closed and mutate nothing.
464        let result = hub.submit(
465            &sym("cli"),
466            &sym("pane"),
467            &edit(Origin::human(1), "a", number("9")),
468        );
469        assert!(
470            result.is_err(),
471            "a mid-loop render failure must fail the whole submit"
472        );
473
474        // Canonical and ledger never moved forward.
475        assert_eq!(hub.canonical(&sym("doc")).cloned(), canonical_before);
476        assert_eq!(hub.ledger().len(), ledger_len_before);
477        // cli's cached last_scene was NOT advanced (no half-applied broadcast).
478        let cli_last = hub
479            .bindings
480            .iter()
481            .find(|binding| binding.surface == sym("cli") && binding.pane == sym("pane"))
482            .map(|binding| binding.last_scene.clone());
483        assert_eq!(
484            cli_last,
485            Some(cli_scene),
486            "cli's cached scene must be untouched after the failed submit"
487        );
488    }
489
490    #[test]
491    fn handoff_extends_broadcast_to_the_target_surface() {
492        let mut hub = hub_with_surfaces();
493        hub.seed(sym("doc"), doc());
494        hub.open(&sym("cli"), sym("pane"), sym("doc")).unwrap();
495        hub.open(&sym("web"), sym("pane"), sym("doc")).unwrap();
496
497        // Hand the resource off from cli to watch (a new pane on watch).
498        hub.handoff(&sym("cli"), &sym("watch"), sym("doc"), sym("pane"))
499            .unwrap();
500
501        let broadcasts = hub
502            .submit(
503                &sym("web"),
504                &sym("pane"),
505                &edit(Origin::human(2), "b", number("7")),
506            )
507            .unwrap();
508
509        // cli, web, AND watch all receive the broadcast now.
510        assert!(broadcasts.iter().any(|b| b.surface == sym("cli")));
511        assert!(broadcasts.iter().any(|b| b.surface == sym("web")));
512        assert!(broadcasts.iter().any(|b| b.surface == sym("watch")));
513    }
514
515    #[test]
516    fn two_writer_conflict_is_last_write_wins_and_replayable() {
517        let mut hub = hub_with_surfaces();
518        let seed = doc();
519        hub.seed(sym("doc"), seed.clone());
520        hub.open(&sym("cli"), sym("pane"), sym("doc")).unwrap();
521        hub.open(&sym("web"), sym("pane"), sym("doc")).unwrap();
522
523        // cli edits, then web edits the same resource and field.
524        hub.submit(
525            &sym("cli"),
526            &sym("pane"),
527            &edit(Origin::human(1), "a", number("10")),
528        )
529        .unwrap();
530        hub.submit(
531            &sym("web"),
532            &sym("pane"),
533            &edit(Origin::agent(2), "a", number("20")),
534        )
535        .unwrap();
536
537        // The final canonical value reflects the LAST commit.
538        let canonical = hub.canonical(&sym("doc")).unwrap().clone();
539        assert_eq!(field(&canonical, "a"), Some(number("20")));
540
541        // The ledger has two rows with the right operators and ticks, in order.
542        let ledger = hub.ledger();
543        assert_eq!(ledger.len(), 2);
544        assert_eq!(ledger[0].operator, sym("human"));
545        assert_eq!(ledger[0].tick, 1);
546        assert_eq!(ledger[1].operator, sym("agent"));
547        assert_eq!(ledger[1].tick, 2);
548
549        // Replaying the ledger over the seed reproduces the final canonical state.
550        let mut seed_state = BTreeMap::new();
551        seed_state.insert(sym("doc"), seed);
552        let replayed = replay(ledger, seed_state).expect("ledger rows are all set-value ops");
553        assert_eq!(replayed.get(&sym("doc")), Some(&canonical));
554    }
555}