Skip to main content

shelly/
session.rs

1use crate::nested::NestedLiveViewInstance;
2use crate::{
3    ClientMessage, ComponentRender, Context, DynamicSlotPatch, Event, Html, LiveResult, LiveView,
4    NestedLiveViewId, NestedLiveViewSnapshot, NestedLiveViewState, NoopTelemetrySink,
5    ServerMessage, SessionId, SessionReplayMetadata, SessionReplayTrace, SessionTraceRecorder,
6    ShellyError, TelemetryEvent, TelemetryEventKind, TelemetrySink, TraceRedactionPolicy,
7    PROTOCOL_VERSION_V1,
8};
9use serde_json::{json, Value};
10use std::{
11    collections::BTreeMap,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16const PROTOCOL_VERSION: &str = PROTOCOL_VERSION_V1;
17const INTERNAL_RENDER_FLUSH_ID: &str = "__shelly_internal_render_flush";
18pub const INTERNAL_RENDER_FLUSH_EVENT: &str = "__shelly_internal_render_flush";
19
20/// Runtime holder for one connected LiveView instance.
21pub struct LiveSession {
22    view: Box<dyn LiveView>,
23    ctx: Context,
24    revision: u64,
25    last_render: Option<Html>,
26    telemetry: Arc<dyn TelemetrySink>,
27    default_render_cadence_ms: u64,
28    last_render_sent_at: Option<Instant>,
29    coalesced_render_count: usize,
30    coalesced_render_since: Option<Instant>,
31    render_flush_deadline: Option<Instant>,
32    trace_recorder: Option<SessionTraceRecorder>,
33    children: BTreeMap<NestedLiveViewId, NestedLiveViewInstance>,
34}
35
36impl LiveSession {
37    /// Create a new session from a boxed LiveView instance.
38    pub fn new(view: Box<dyn LiveView>, target_dom_id: impl Into<String>) -> Self {
39        Self::new_with_session_id(view, SessionId::new().to_string(), target_dom_id)
40    }
41
42    /// Create a new session with an existing signed session id.
43    pub fn new_with_session_id(
44        view: Box<dyn LiveView>,
45        session_id: impl Into<String>,
46        target_dom_id: impl Into<String>,
47    ) -> Self {
48        Self::new_with_route_and_session_id(view, session_id, target_dom_id, "/", BTreeMap::new())
49    }
50
51    /// Create a new session with adapter-matched route data.
52    pub fn new_with_route(
53        view: Box<dyn LiveView>,
54        target_dom_id: impl Into<String>,
55        route_path: impl Into<String>,
56        route_params: BTreeMap<String, String>,
57    ) -> Self {
58        Self::new_with_route_and_session_id(
59            view,
60            SessionId::new().to_string(),
61            target_dom_id,
62            route_path,
63            route_params,
64        )
65    }
66
67    /// Create a new session with an existing signed session id and route data.
68    pub fn new_with_route_and_session_id(
69        view: Box<dyn LiveView>,
70        session_id: impl Into<String>,
71        target_dom_id: impl Into<String>,
72        route_path: impl Into<String>,
73        route_params: BTreeMap<String, String>,
74    ) -> Self {
75        let mut ctx =
76            Context::new_with_session_id(SessionId::from_string(session_id.into()), target_dom_id);
77        ctx.set_route(route_path, route_params);
78        Self {
79            view,
80            ctx,
81            revision: 0,
82            last_render: None,
83            telemetry: Arc::new(NoopTelemetrySink),
84            default_render_cadence_ms: 0,
85            last_render_sent_at: None,
86            coalesced_render_count: 0,
87            coalesced_render_since: None,
88            render_flush_deadline: None,
89            trace_recorder: None,
90            children: BTreeMap::new(),
91        }
92    }
93
94    /// Set a telemetry sink for runtime/session instrumentation.
95    pub fn set_telemetry_sink(&mut self, telemetry: Arc<dyn TelemetrySink>) {
96        self.telemetry = telemetry;
97    }
98
99    /// Configure default root-render cadence in milliseconds.
100    ///
101    /// A value of `0` keeps immediate render behavior.
102    pub fn set_default_render_cadence_ms(&mut self, cadence_ms: u64) {
103        self.default_render_cadence_ms = cadence_ms;
104    }
105
106    /// Run the mount callback and mark the session as connected.
107    pub fn mount(&mut self) -> LiveResult {
108        let started = Instant::now();
109        self.ctx.set_connected(true);
110        let result = self
111            .view
112            .mount(&mut self.ctx)
113            .and_then(|_| self.view.handle_params(&mut self.ctx));
114        if result.is_err() {
115            self.ctx.set_connected(false);
116            let _ = self.ctx.drain_pushes();
117            let _ = self.ctx.drain_pubsub_commands();
118            let _ = self.ctx.drain_runtime_commands();
119        }
120        self.emit_telemetry(
121            self.telemetry_event(TelemetryEventKind::Mount)
122                .with_ok(result.is_ok())
123                .with_latency_ms(started.elapsed().as_millis() as u64),
124        );
125        result
126    }
127
128    /// Drain PubSub commands queued by mount, params, or event handlers.
129    pub fn drain_pubsub_commands(&mut self) -> Vec<crate::PubSubCommand> {
130        let mut commands = self.ctx.drain_pubsub_commands();
131        for child in self.children.values_mut() {
132            commands.extend(child.ctx.drain_pubsub_commands());
133        }
134        commands
135    }
136
137    /// Drain runtime commands queued by mount, params, or event handlers.
138    pub fn drain_runtime_commands(&mut self) -> Vec<crate::RuntimeCommand> {
139        let mut commands = self.ctx.drain_runtime_commands();
140        for child in self.children.values_mut() {
141            commands.extend(child.ctx.drain_runtime_commands());
142        }
143        commands
144    }
145
146    /// Session id used by protocol hello messages.
147    pub fn session_id(&self) -> &str {
148        self.ctx.session_id().as_str()
149    }
150
151    /// Current internal route path tracked by the session.
152    pub fn route_path(&self) -> &str {
153        self.ctx.route_path()
154    }
155
156    /// Current route params tracked by the session.
157    pub fn route_params(&self) -> &BTreeMap<String, String> {
158        self.ctx.route_params()
159    }
160
161    /// Tenant context tracked by this session.
162    pub fn tenant_id(&self) -> Option<&str> {
163        self.ctx.tenant_id()
164    }
165
166    /// Set tenant context for this session.
167    pub fn set_tenant_id(&mut self, tenant_id: impl Into<String>) {
168        self.ctx.set_tenant_id(tenant_id);
169    }
170
171    /// Replace tenant context for this session.
172    pub fn set_tenant_id_optional(&mut self, tenant_id: Option<String>) {
173        self.ctx.set_tenant_id_optional(tenant_id);
174    }
175
176    /// Current monotonic render revision.
177    pub fn revision(&self) -> u64 {
178        self.revision
179    }
180
181    /// Return snapshots for mounted, suspended, and terminated nested LiveViews.
182    pub fn child_snapshots(&self) -> Vec<NestedLiveViewSnapshot> {
183        self.children
184            .iter()
185            .map(|(id, child)| child.snapshot(id.clone()))
186            .collect()
187    }
188
189    /// Return the lifecycle state for one nested LiveView.
190    pub fn child_state(&self, id: impl AsRef<str>) -> Option<NestedLiveViewState> {
191        self.children
192            .get(&NestedLiveViewId::new(id.as_ref()))
193            .map(|child| child.state)
194    }
195
196    /// Mount a child LiveView under this session and return its initial patch.
197    pub fn mount_child(
198        &mut self,
199        id: impl Into<NestedLiveViewId>,
200        view: Box<dyn LiveView>,
201        target_dom_id: impl Into<String>,
202    ) -> LiveResult<ServerMessage> {
203        let route_path = self.route_path().to_string();
204        let route_params = self.route_params().clone();
205        self.mount_child_with_route(id, view, target_dom_id, route_path, route_params)
206    }
207
208    /// Mount a child LiveView with explicit route data and return its initial patch.
209    pub fn mount_child_with_route(
210        &mut self,
211        id: impl Into<NestedLiveViewId>,
212        view: Box<dyn LiveView>,
213        target_dom_id: impl Into<String>,
214        route_path: impl Into<String>,
215        route_params: BTreeMap<String, String>,
216    ) -> LiveResult<ServerMessage> {
217        let id = id.into();
218        let target_dom_id = target_dom_id.into();
219        if self.children.contains_key(&id) {
220            return Err(ShellyError::InvalidMessage(format!(
221                "nested LiveView `{id}` is already mounted"
222            )));
223        }
224
225        let mut ctx = Context::new_with_session_id(
226            SessionId::from_string(self.session_id().to_string()),
227            target_dom_id,
228        );
229        ctx.set_connected(true);
230        ctx.set_tenant_id_optional(self.tenant_id().map(ToString::to_string));
231        ctx.set_route(route_path, route_params);
232
233        let mut child = NestedLiveViewInstance {
234            view,
235            ctx,
236            state: NestedLiveViewState::Mounted,
237            last_render: None,
238        };
239
240        child.view.mount(&mut child.ctx)?;
241        child.view.handle_params(&mut child.ctx)?;
242
243        self.children.insert(id.clone(), child);
244        let patch = self.render_child_update_by_id(&id)?;
245        self.emit_child_lifecycle_telemetry(&id, "mount", true);
246        Ok(patch)
247    }
248
249    /// Update route params for a mounted child LiveView and return a child patch.
250    pub fn update_child_params(
251        &mut self,
252        id: impl AsRef<str>,
253        route_path: impl Into<String>,
254        route_params: BTreeMap<String, String>,
255    ) -> LiveResult<ServerMessage> {
256        let id = NestedLiveViewId::new(id.as_ref());
257        {
258            let child = self.active_child_mut(&id)?;
259            child.ctx.set_route(route_path, route_params);
260            child.view.handle_params(&mut child.ctx)?;
261        }
262        self.emit_child_lifecycle_telemetry(&id, "params_update", true);
263        self.render_child_update_by_id(&id)
264    }
265
266    /// Suspend a child LiveView while keeping its server-side state.
267    pub fn suspend_child(&mut self, id: impl AsRef<str>) -> LiveResult {
268        let id = NestedLiveViewId::new(id.as_ref());
269        let child = self.child_mut(&id)?;
270        if child.state == NestedLiveViewState::Terminated {
271            return Err(ShellyError::InvalidMessage(format!(
272                "nested LiveView `{id}` is terminated"
273            )));
274        }
275        child.state = NestedLiveViewState::Suspended;
276        child.ctx.set_connected(false);
277        self.emit_child_lifecycle_telemetry(&id, "suspend", true);
278        Ok(())
279    }
280
281    /// Resume a suspended child LiveView and return a child snapshot patch.
282    pub fn resume_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
283        let id = NestedLiveViewId::new(id.as_ref());
284        {
285            let child = self.child_mut(&id)?;
286            if child.state == NestedLiveViewState::Terminated {
287                return Err(ShellyError::InvalidMessage(format!(
288                    "nested LiveView `{id}` is terminated"
289                )));
290            }
291            child.state = NestedLiveViewState::Mounted;
292            child.ctx.set_connected(true);
293        }
294        self.emit_child_lifecycle_telemetry(&id, "resume", true);
295        self.render_child_update_by_id(&id)
296    }
297
298    /// Terminate a child LiveView and patch its target to an empty body.
299    pub fn terminate_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
300        let id = NestedLiveViewId::new(id.as_ref());
301        let target = {
302            let child = self.child_mut(&id)?;
303            child.state = NestedLiveViewState::Terminated;
304            child.ctx.set_connected(false);
305            child.ctx.target_dom_id().to_string()
306        };
307        self.revision += 1;
308        let message = ServerMessage::Patch {
309            target,
310            html: String::new(),
311            revision: self.revision,
312        };
313        self.emit_child_lifecycle_telemetry(&id, "terminate", true);
314        self.emit_telemetry(
315            self.telemetry_event(TelemetryEventKind::Patch)
316                .with_bytes(0)
317                .with_count(1)
318                .with_attribute("nested_live_view".to_string(), Value::Bool(true))
319                .with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
320        );
321        Ok(message)
322    }
323
324    /// Enable deterministic session trace capture.
325    pub fn enable_trace_capture(&mut self, policy: TraceRedactionPolicy) {
326        self.trace_recorder = Some(SessionTraceRecorder::new(
327            SessionReplayMetadata {
328                protocol: PROTOCOL_VERSION.to_string(),
329                session_id: self.session_id().to_string(),
330                tenant_id: self.tenant_id().map(ToString::to_string),
331                target_id: self.ctx.target_dom_id().to_string(),
332                route_path: self.route_path().to_string(),
333                route_params: self.route_params().clone(),
334            },
335            policy,
336        ));
337    }
338
339    /// Disable session trace capture and drop buffered trace state.
340    pub fn disable_trace_capture(&mut self) {
341        self.trace_recorder = None;
342    }
343
344    /// Whether deterministic trace capture is currently enabled.
345    pub fn trace_capture_enabled(&self) -> bool {
346        self.trace_recorder.is_some()
347    }
348
349    /// Clone the current replay trace artifact, if capture is enabled.
350    pub fn trace_artifact(&self) -> Option<SessionReplayTrace> {
351        self.trace_recorder
352            .as_ref()
353            .map(SessionTraceRecorder::artifact)
354    }
355
356    /// Move the replay trace artifact out of this session.
357    pub fn take_trace_artifact(&mut self) -> Option<SessionReplayTrace> {
358        self.trace_recorder
359            .take()
360            .map(SessionTraceRecorder::into_artifact)
361    }
362
363    fn emit_telemetry(&self, event: TelemetryEvent) {
364        let _ = self.telemetry.emit(event);
365    }
366
367    fn telemetry_event(&self, kind: TelemetryEventKind) -> TelemetryEvent {
368        let mut event = TelemetryEvent::new(kind)
369            .with_session(self.session_id().to_string())
370            .with_route(self.ctx.route_path().to_string());
371        if let Some(tenant_id) = self.tenant_id() {
372            event = event.with_attribute(
373                "tenant_id".to_string(),
374                Value::String(tenant_id.to_string()),
375            );
376        }
377        event
378    }
379
380    fn effective_render_cadence_ms(&self) -> u64 {
381        self.ctx
382            .render_cadence_override_ms()
383            .unwrap_or(self.default_render_cadence_ms)
384    }
385
386    fn reset_coalesced_render_state(&mut self) {
387        self.coalesced_render_count = 0;
388        self.coalesced_render_since = None;
389        self.render_flush_deadline = None;
390    }
391
392    fn mark_render_sent_now(&mut self) {
393        if self.coalesced_render_count > 0 {
394            self.ctx.cancel_schedule_internal(INTERNAL_RENDER_FLUSH_ID);
395        }
396        self.last_render_sent_at = Some(Instant::now());
397        self.reset_coalesced_render_state();
398    }
399
400    fn emit_render_cadence_telemetry(
401        &self,
402        phase: &'static str,
403        cadence_ms: u64,
404        coalesced_count: usize,
405        latency_ms: Option<u64>,
406    ) {
407        self.emit_telemetry(
408            self.telemetry_event(TelemetryEventKind::RenderCadence)
409                .with_ok(true)
410                .with_count(coalesced_count.max(1))
411                .with_latency_ms(latency_ms.unwrap_or(0))
412                .with_attribute("render_phase".to_string(), Value::String(phase.to_string()))
413                .with_attribute(
414                    "render_cadence_ms".to_string(),
415                    Value::Number(serde_json::Number::from(cadence_ms)),
416                )
417                .with_attribute(
418                    "coalesced_count".to_string(),
419                    Value::Number(serde_json::Number::from(coalesced_count)),
420                ),
421        );
422    }
423
424    fn schedule_render_flush(&mut self, delay: Duration) {
425        self.ctx.schedule_internal_once(
426            INTERNAL_RENDER_FLUSH_ID,
427            delay.as_millis() as u64,
428            INTERNAL_RENDER_FLUSH_EVENT,
429            json!({"internal": true}),
430        );
431    }
432
433    fn child_mut(&mut self, id: &NestedLiveViewId) -> LiveResult<&mut NestedLiveViewInstance> {
434        self.children.get_mut(id).ok_or_else(|| {
435            ShellyError::InvalidMessage(format!("nested LiveView `{id}` is not mounted"))
436        })
437    }
438
439    fn active_child_mut(
440        &mut self,
441        id: &NestedLiveViewId,
442    ) -> LiveResult<&mut NestedLiveViewInstance> {
443        let child = self.child_mut(id)?;
444        match child.state {
445            NestedLiveViewState::Mounted => Ok(child),
446            NestedLiveViewState::Suspended => Err(ShellyError::InvalidMessage(format!(
447                "nested LiveView `{id}` is suspended"
448            ))),
449            NestedLiveViewState::Terminated => Err(ShellyError::InvalidMessage(format!(
450                "nested LiveView `{id}` is terminated"
451            ))),
452        }
453    }
454
455    fn child_id_for_target(&self, target: &str) -> Option<NestedLiveViewId> {
456        let id = NestedLiveViewId::new(target);
457        if self.children.contains_key(&id) {
458            return Some(id);
459        }
460        self.children
461            .iter()
462            .find_map(|(id, child)| (child.ctx.target_dom_id() == target).then(|| id.clone()))
463    }
464
465    fn emit_child_lifecycle_telemetry(
466        &self,
467        id: &NestedLiveViewId,
468        lifecycle: &'static str,
469        ok: bool,
470    ) {
471        self.emit_telemetry(
472            self.telemetry_event(TelemetryEventKind::Mount)
473                .with_ok(ok)
474                .with_attribute("nested_live_view".to_string(), Value::Bool(true))
475                .with_attribute(
476                    "nested_lifecycle".to_string(),
477                    Value::String(lifecycle.to_string()),
478                )
479                .with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
480        );
481    }
482
483    fn render_child_update_by_id(&mut self, id: &NestedLiveViewId) -> LiveResult<ServerMessage> {
484        let (target, next, previous) = {
485            let child = self.active_child_mut(id)?;
486            let next = child.view.render();
487            let previous = child.last_render.clone();
488            child.last_render = Some(next.clone());
489            let _ = child.ctx.take_render_after_event();
490            (child.ctx.target_dom_id().to_string(), next, previous)
491        };
492
493        self.revision += 1;
494        let diff = diff_template(previous.as_ref(), &next);
495        let message = match diff {
496            Some(slots) => ServerMessage::Diff {
497                target,
498                revision: self.revision,
499                slots,
500            },
501            None => ServerMessage::Patch {
502                target,
503                html: next.as_str().to_string(),
504                revision: self.revision,
505            },
506        };
507
508        match &message {
509            ServerMessage::Diff { slots, .. } => {
510                let bytes = slots.iter().map(|slot| slot.html.len()).sum();
511                self.emit_telemetry(
512                    self.telemetry_event(TelemetryEventKind::Diff)
513                        .with_bytes(bytes)
514                        .with_count(slots.len())
515                        .with_attribute("nested_live_view".to_string(), Value::Bool(true))
516                        .with_attribute(
517                            "nested_child_id".to_string(),
518                            Value::String(id.to_string()),
519                        ),
520                );
521            }
522            ServerMessage::Patch { html, .. } => {
523                self.emit_telemetry(
524                    self.telemetry_event(TelemetryEventKind::Patch)
525                        .with_bytes(html.len())
526                        .with_count(1)
527                        .with_attribute("nested_live_view".to_string(), Value::Bool(true))
528                        .with_attribute(
529                            "nested_child_id".to_string(),
530                            Value::String(id.to_string()),
531                        ),
532                );
533            }
534            _ => {}
535        }
536
537        Ok(message)
538    }
539
540    fn handle_child_event(&mut self, id: NestedLiveViewId, mut event: Event) -> Vec<ServerMessage> {
541        event.target = Some(id.to_string());
542        let result = {
543            let child = match self.active_child_mut(&id) {
544                Ok(child) => child,
545                Err(err) => {
546                    return vec![ServerMessage::Error {
547                        message: err.to_string(),
548                        code: Some("nested_live_view_unavailable".to_string()),
549                    }];
550                }
551            };
552            child.view.handle_event(event, &mut child.ctx)
553        };
554
555        if let Err(err) = result {
556            if let Ok(child) = self.child_mut(&id) {
557                let _ = child.ctx.drain_pushes();
558                let _ = child.ctx.drain_pubsub_commands();
559                let _ = child.ctx.drain_runtime_commands();
560                let _ = child.ctx.take_render_after_event();
561            }
562            return vec![ServerMessage::Error {
563                message: err.to_string(),
564                code: Some("nested_event_failed".to_string()),
565            }];
566        }
567
568        let (mut messages, render) = match self.child_mut(&id) {
569            Ok(child) => (
570                child.ctx.drain_pushes(),
571                child.ctx.take_render_after_event(),
572            ),
573            Err(err) => {
574                return vec![ServerMessage::Error {
575                    message: err.to_string(),
576                    code: Some("nested_live_view_unavailable".to_string()),
577                }];
578            }
579        };
580        if render {
581            match self.render_child_update_by_id(&id) {
582                Ok(message) => messages.push(message),
583                Err(err) => messages.push(ServerMessage::Error {
584                    message: err.to_string(),
585                    code: Some("nested_event_failed".to_string()),
586                }),
587            }
588        }
589        messages
590    }
591
592    fn coalesce_or_render_root_update(&mut self, messages: &mut Vec<ServerMessage>) {
593        let cadence_ms = self.effective_render_cadence_ms();
594        if cadence_ms == 0 {
595            if self.coalesced_render_count > 0 {
596                let since = self.coalesced_render_since;
597                self.emit_render_cadence_telemetry(
598                    "flush_immediate_override",
599                    cadence_ms,
600                    self.coalesced_render_count,
601                    since.map(|started| started.elapsed().as_millis() as u64),
602                );
603            }
604            messages.push(self.render_update());
605            return;
606        }
607
608        let now = Instant::now();
609        let elapsed_since_render = self
610            .last_render_sent_at
611            .map(|at| now.saturating_duration_since(at));
612
613        let can_render_now = match elapsed_since_render {
614            None => true,
615            Some(elapsed) => elapsed.as_millis() as u64 >= cadence_ms,
616        };
617
618        if can_render_now {
619            if self.coalesced_render_count > 0 {
620                let since = self.coalesced_render_since;
621                self.emit_render_cadence_telemetry(
622                    "flush",
623                    cadence_ms,
624                    self.coalesced_render_count,
625                    since.map(|started| started.elapsed().as_millis() as u64),
626                );
627            } else {
628                self.emit_render_cadence_telemetry("immediate", cadence_ms, 0, Some(0));
629            }
630            messages.push(self.render_update());
631            return;
632        }
633
634        let elapsed_ms = elapsed_since_render
635            .map(|elapsed| elapsed.as_millis() as u64)
636            .unwrap_or(0);
637        let remaining_ms = cadence_ms.saturating_sub(elapsed_ms).max(1);
638        self.coalesced_render_count += 1;
639        if self.coalesced_render_since.is_none() {
640            self.coalesced_render_since = Some(now);
641        }
642        self.emit_render_cadence_telemetry(
643            "coalesced",
644            cadence_ms,
645            self.coalesced_render_count,
646            Some(remaining_ms),
647        );
648        let deadline = now + Duration::from_millis(remaining_ms);
649        let should_schedule = match self.render_flush_deadline {
650            Some(existing_deadline) => deadline < existing_deadline,
651            None => true,
652        };
653        if should_schedule {
654            self.render_flush_deadline = Some(deadline);
655            self.schedule_render_flush(Duration::from_millis(remaining_ms));
656        }
657    }
658
659    fn handle_internal_render_flush_event(&mut self) -> Vec<ServerMessage> {
660        if self.coalesced_render_count == 0 {
661            return Vec::new();
662        }
663        self.render_flush_deadline = None;
664
665        let mut messages = Vec::new();
666        self.coalesce_or_render_root_update(&mut messages);
667        messages
668    }
669
670    /// Render the current view into an opaque HTML fragment.
671    pub fn render_html(&self) -> Html {
672        self.view.render()
673    }
674
675    /// Update route data and call the view's route-param hook.
676    pub fn patch_route(
677        &mut self,
678        route_path: impl Into<String>,
679        route_params: BTreeMap<String, String>,
680    ) -> LiveResult {
681        self.ctx.set_route(route_path, route_params);
682        self.view.handle_params(&mut self.ctx)
683    }
684
685    /// Build the protocol hello message.
686    pub fn hello(&self) -> ServerMessage {
687        ServerMessage::Hello {
688            session_id: self.session_id().to_string(),
689            target: self.ctx.target_dom_id().to_string(),
690            revision: self.revision,
691            protocol: PROTOCOL_VERSION.to_string(),
692            server_revision: None,
693            resume_status: None,
694            resume_reason: None,
695            resume_token: None,
696            resume_expires_in_ms: None,
697        }
698    }
699
700    /// Render the current view as a browser patch.
701    pub fn render_patch(&mut self) -> ServerMessage {
702        self.revision += 1;
703        let html = self.view.render();
704        let html_string = html.as_str().to_string();
705        let bytes = html_string.len();
706        self.last_render = Some(html);
707        let message = ServerMessage::Patch {
708            target: self.ctx.target_dom_id().to_string(),
709            html: html_string,
710            revision: self.revision,
711        };
712        self.emit_telemetry(
713            self.telemetry_event(TelemetryEventKind::Patch)
714                .with_bytes(bytes)
715                .with_count(1),
716        );
717        self.mark_render_sent_now();
718        message
719    }
720
721    /// Render the current view as a dynamic diff when possible, otherwise a full patch.
722    pub fn render_update(&mut self) -> ServerMessage {
723        let next = self.view.render();
724        let diff = self.diff_against_last_render(&next);
725        self.revision += 1;
726
727        let message = match diff {
728            Some(slots) => ServerMessage::Diff {
729                target: self.ctx.target_dom_id().to_string(),
730                revision: self.revision,
731                slots,
732            },
733            None => ServerMessage::Patch {
734                target: self.ctx.target_dom_id().to_string(),
735                html: next.as_str().to_string(),
736                revision: self.revision,
737            },
738        };
739
740        match &message {
741            ServerMessage::Diff { slots, .. } => {
742                let bytes = slots.iter().map(|slot| slot.html.len()).sum();
743                self.emit_telemetry(
744                    self.telemetry_event(TelemetryEventKind::Diff)
745                        .with_bytes(bytes)
746                        .with_count(slots.len()),
747                );
748            }
749            ServerMessage::Patch { html, .. } => {
750                self.emit_telemetry(
751                    self.telemetry_event(TelemetryEventKind::Patch)
752                        .with_bytes(html.len())
753                        .with_count(1),
754                );
755            }
756            _ => {}
757        }
758
759        self.last_render = Some(next);
760        self.mark_render_sent_now();
761        message
762    }
763
764    /// Render the current view into a patch at the current revision.
765    ///
766    /// This is used by reconnect reconciliation paths where the server must
767    /// resend the latest snapshot without incrementing revision.
768    pub fn render_snapshot_patch(&mut self) -> ServerMessage {
769        let html = self.view.render();
770        let html_string = html.as_str().to_string();
771        self.last_render = Some(html);
772        self.emit_telemetry(
773            self.telemetry_event(TelemetryEventKind::Patch)
774                .with_bytes(html_string.len())
775                .with_count(1)
776                .with_attribute("snapshot".to_string(), Value::Bool(true)),
777        );
778        let message = ServerMessage::Patch {
779            target: self.ctx.target_dom_id().to_string(),
780            html: html_string,
781            revision: self.revision,
782        };
783        self.mark_render_sent_now();
784        message
785    }
786
787    /// Render one component as an independent browser patch.
788    pub fn render_component_update(&mut self, render: ComponentRender) -> ServerMessage {
789        self.revision += 1;
790        let (id, html) = render.into_parts();
791        let message = ServerMessage::Patch {
792            target: id.to_string(),
793            html: html.as_str().to_string(),
794            revision: self.revision,
795        };
796        if let ServerMessage::Patch { html, .. } = &message {
797            self.emit_telemetry(
798                self.telemetry_event(TelemetryEventKind::Patch)
799                    .with_bytes(html.len())
800                    .with_count(1)
801                    .with_attribute("component_patch".to_string(), Value::Bool(true)),
802            );
803        }
804        message
805    }
806
807    /// Handle one client protocol message and return zero or more server messages.
808    pub fn handle_client_message(&mut self, message: ClientMessage) -> Vec<ServerMessage> {
809        let started = Instant::now();
810        let revision_before = self.revision;
811        let captured_message = message.clone();
812        if let Some(tenant_id) = tenant_id_for_client_message(&message) {
813            self.set_tenant_id(tenant_id);
814        }
815        let event_name = match &message {
816            ClientMessage::Event { event, .. } if event != INTERNAL_RENDER_FLUSH_EVENT => {
817                Some(event.clone())
818            }
819            _ => None,
820        };
821
822        let messages = match message {
823            ClientMessage::Connect { protocol, .. } => {
824                if protocol == PROTOCOL_VERSION {
825                    Vec::new()
826                } else {
827                    vec![ServerMessage::Error {
828                        message: format!(
829                            "unsupported protocol in connect: expected {PROTOCOL_VERSION}, got {protocol}"
830                        ),
831                        code: Some("unsupported_protocol".to_string()),
832                    }]
833                }
834            }
835            ClientMessage::Ping { nonce } => vec![ServerMessage::Pong { nonce }],
836            ClientMessage::PatchUrl { to } => vec![ServerMessage::PatchUrl { to }],
837            ClientMessage::Navigate { to } => vec![ServerMessage::Navigate { to }],
838            ClientMessage::UploadStart { .. }
839            | ClientMessage::UploadChunk { .. }
840            | ClientMessage::UploadComplete { .. } => vec![ServerMessage::Error {
841                message: "upload protocol messages must be handled by a transport adapter"
842                    .to_string(),
843                code: Some("unsupported_upload_transport".to_string()),
844            }],
845            event_message @ ClientMessage::Event { .. } => {
846                let is_internal_flush = matches!(
847                    &event_message,
848                    ClientMessage::Event { event, target, .. }
849                        if event == INTERNAL_RENDER_FLUSH_EVENT && target.is_none()
850                );
851                if is_internal_flush {
852                    self.handle_internal_render_flush_event()
853                } else {
854                    match Event::try_from(event_message) {
855                        Err(err) => vec![ServerMessage::Error {
856                            message: err.to_string(),
857                            code: Some("invalid_event".to_string()),
858                        }],
859                        Ok(event) => {
860                            if let Some(target) = event.target.clone() {
861                                if let Some(child_id) = self.child_id_for_target(&target) {
862                                    self.handle_child_event(child_id, event)
863                                } else {
864                                    match self.view.handle_component_event(
865                                        &target,
866                                        event.clone(),
867                                        &mut self.ctx,
868                                    ) {
869                                        Ok(Some(render)) => {
870                                            let mut messages = self.ctx.drain_pushes();
871                                            if self.ctx.take_render_after_event() {
872                                                messages.push(self.render_component_update(render));
873                                            }
874                                            messages
875                                        }
876                                        Ok(None) => {
877                                            if let Err(err) =
878                                                self.view.handle_event(event, &mut self.ctx)
879                                            {
880                                                let _ = self.ctx.drain_pushes();
881                                                let _ = self.ctx.drain_pubsub_commands();
882                                                let _ = self.ctx.drain_runtime_commands();
883                                                let _ = self.ctx.take_render_after_event();
884                                                vec![ServerMessage::Error {
885                                                    message: err.to_string(),
886                                                    code: Some("event_failed".to_string()),
887                                                }]
888                                            } else {
889                                                let mut messages = self.ctx.drain_pushes();
890                                                if self.ctx.take_render_after_event() {
891                                                    self.coalesce_or_render_root_update(
892                                                        &mut messages,
893                                                    );
894                                                }
895                                                messages
896                                            }
897                                        }
898                                        Err(err) => {
899                                            let _ = self.ctx.drain_pushes();
900                                            let _ = self.ctx.drain_pubsub_commands();
901                                            let _ = self.ctx.drain_runtime_commands();
902                                            let _ = self.ctx.take_render_after_event();
903                                            vec![ServerMessage::Error {
904                                                message: err.to_string(),
905                                                code: Some("event_failed".to_string()),
906                                            }]
907                                        }
908                                    }
909                                }
910                            } else if let Err(err) = self.view.handle_event(event, &mut self.ctx) {
911                                let _ = self.ctx.drain_pushes();
912                                let _ = self.ctx.drain_pubsub_commands();
913                                let _ = self.ctx.drain_runtime_commands();
914                                let _ = self.ctx.take_render_after_event();
915                                vec![ServerMessage::Error {
916                                    message: err.to_string(),
917                                    code: Some("event_failed".to_string()),
918                                }]
919                            } else {
920                                let mut messages = self.ctx.drain_pushes();
921                                if self.ctx.take_render_after_event() {
922                                    self.coalesce_or_render_root_update(&mut messages);
923                                }
924                                messages
925                            }
926                        }
927                    }
928                }
929            }
930        };
931
932        for message in &messages {
933            match message {
934                ServerMessage::StreamInsert { .. } => {
935                    self.emit_telemetry(
936                        self.telemetry_event(TelemetryEventKind::StreamInsert)
937                            .with_count(1),
938                    );
939                }
940                ServerMessage::StreamDelete { .. } => {
941                    self.emit_telemetry(
942                        self.telemetry_event(TelemetryEventKind::StreamDelete)
943                            .with_count(1),
944                    );
945                }
946                ServerMessage::StreamBatch { operations, .. } => {
947                    let inserts = operations
948                        .iter()
949                        .filter(|operation| {
950                            matches!(operation, crate::StreamBatchOperation::Insert { .. })
951                        })
952                        .count();
953                    let deletes = operations
954                        .iter()
955                        .filter(|operation| {
956                            matches!(operation, crate::StreamBatchOperation::Delete { .. })
957                        })
958                        .count();
959                    if inserts > 0 {
960                        self.emit_telemetry(
961                            self.telemetry_event(TelemetryEventKind::StreamInsert)
962                                .with_count(inserts),
963                        );
964                    }
965                    if deletes > 0 {
966                        self.emit_telemetry(
967                            self.telemetry_event(TelemetryEventKind::StreamDelete)
968                                .with_count(deletes),
969                        );
970                    }
971                }
972                ServerMessage::Error { .. } => {
973                    self.emit_telemetry(
974                        self.telemetry_event(TelemetryEventKind::Error)
975                            .with_ok(false)
976                            .with_count(1),
977                    );
978                }
979                _ => {}
980            }
981        }
982
983        let ok = messages
984            .iter()
985            .all(|message| !matches!(message, ServerMessage::Error { .. }));
986        let mut event_telemetry = self
987            .telemetry_event(TelemetryEventKind::HandleEvent)
988            .with_ok(ok)
989            .with_latency_ms(started.elapsed().as_millis() as u64)
990            .with_count(messages.len());
991        if let Some(name) = event_name {
992            event_telemetry = event_telemetry.with_event_name(name);
993        }
994        self.emit_telemetry(event_telemetry);
995
996        if let Some(recorder) = self.trace_recorder.as_mut() {
997            recorder.record_turn(&captured_message, &messages, revision_before, self.revision);
998        }
999
1000        messages
1001    }
1002
1003    fn diff_against_last_render(&self, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
1004        diff_template(self.last_render.as_ref(), next)
1005    }
1006}
1007
1008fn diff_template(previous: Option<&Html>, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
1009    let previous = previous?.template()?;
1010    let next_template = next.template()?;
1011    if !previous.compatible_with(next_template) {
1012        return None;
1013    }
1014
1015    Some(
1016        previous
1017            .dynamic_segments()
1018            .iter()
1019            .zip(next_template.dynamic_segments())
1020            .enumerate()
1021            .filter_map(|(index, (before, after))| {
1022                if before == after {
1023                    None
1024                } else {
1025                    Some(DynamicSlotPatch {
1026                        index,
1027                        html: after.clone(),
1028                    })
1029                }
1030            })
1031            .collect(),
1032    )
1033}
1034
1035fn tenant_id_for_client_message(message: &ClientMessage) -> Option<String> {
1036    match message {
1037        ClientMessage::Connect { tenant_id, .. } => normalize_tenant_id(tenant_id.as_deref()),
1038        ClientMessage::Event {
1039            metadata, value, ..
1040        } => metadata
1041            .get("tenant_id")
1042            .and_then(Value::as_str)
1043            .or_else(|| value.get("tenant_id").and_then(Value::as_str))
1044            .and_then(|value| normalize_tenant_id(Some(value))),
1045        _ => None,
1046    }
1047}
1048
1049fn normalize_tenant_id(value: Option<&str>) -> Option<String> {
1050    value
1051        .map(str::trim)
1052        .filter(|value| !value.is_empty())
1053        .map(ToString::to_string)
1054}
1055
1056impl Drop for LiveSession {
1057    fn drop(&mut self) {
1058        self.ctx.set_connected(false);
1059    }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064    use super::{LiveSession, INTERNAL_RENDER_FLUSH_EVENT};
1065    use crate::{
1066        ClientMessage, ComponentId, ComponentRender, Context, DynamicSlotPatch, Event, Html,
1067        LiveComponent, LiveResult, LiveView, MemoryTelemetrySink, NestedLiveViewState,
1068        RuntimeCommand, ServerMessage, ShellyError, StreamBatchOperation, TelemetryEvent,
1069        TelemetryEventKind, TelemetrySink, Template, TraceRedactionPolicy,
1070    };
1071    use serde_json::{json, Value};
1072    use std::{collections::BTreeMap, sync::Arc, thread, time::Duration};
1073
1074    #[derive(Default)]
1075    struct Counter {
1076        count: i64,
1077    }
1078
1079    impl LiveView for Counter {
1080        fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1081            match event.name.as_str() {
1082                "inc" => self.count += 1,
1083                "dec" => self.count -= 1,
1084                _ => {}
1085            }
1086
1087            Ok(())
1088        }
1089
1090        fn render(&self) -> Html {
1091            Html::new(format!("<p>{}</p>", self.count))
1092        }
1093    }
1094
1095    #[test]
1096    fn session_dispatches_event_and_patches() {
1097        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1098        session.mount().unwrap();
1099
1100        let messages = session.handle_client_message(ClientMessage::Event {
1101            event: "inc".to_string(),
1102            target: None,
1103            value: Value::Null,
1104            metadata: Default::default(),
1105        });
1106
1107        assert_eq!(messages.len(), 1);
1108        assert_eq!(
1109            messages[0],
1110            ServerMessage::Patch {
1111                target: "root".to_string(),
1112                html: "<p>1</p>".to_string(),
1113                revision: 1,
1114            }
1115        );
1116    }
1117
1118    #[test]
1119    fn session_responds_to_ping_without_rendering() {
1120        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1121        let messages = session.handle_client_message(ClientMessage::Ping {
1122            nonce: Some("abc".to_string()),
1123        });
1124
1125        assert_eq!(
1126            messages,
1127            vec![ServerMessage::Pong {
1128                nonce: Some("abc".to_string())
1129            }]
1130        );
1131        assert_eq!(session.revision(), 0);
1132    }
1133
1134    #[test]
1135    fn connect_with_supported_protocol_is_noop() {
1136        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1137        let messages = session.handle_client_message(ClientMessage::Connect {
1138            protocol: "shelly/1".to_string(),
1139            session_id: None,
1140            last_revision: None,
1141            resume_token: None,
1142            tenant_id: None,
1143            trace_id: None,
1144            span_id: None,
1145            parent_span_id: None,
1146            correlation_id: None,
1147            request_id: None,
1148        });
1149        assert!(messages.is_empty());
1150    }
1151
1152    #[test]
1153    fn connect_with_unsupported_protocol_returns_structured_error() {
1154        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1155        let messages = session.handle_client_message(ClientMessage::Connect {
1156            protocol: "shelly/2".to_string(),
1157            session_id: None,
1158            last_revision: None,
1159            resume_token: None,
1160            tenant_id: None,
1161            trace_id: None,
1162            span_id: None,
1163            parent_span_id: None,
1164            correlation_id: None,
1165            request_id: None,
1166        });
1167        assert_eq!(
1168            messages,
1169            vec![ServerMessage::Error {
1170                message: "unsupported protocol in connect: expected shelly/1, got shelly/2"
1171                    .to_string(),
1172                code: Some("unsupported_protocol".to_string()),
1173            }]
1174        );
1175    }
1176
1177    #[test]
1178    fn connect_and_event_propagate_tenant_context() {
1179        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1180        let _ = session.handle_client_message(ClientMessage::Connect {
1181            protocol: "shelly/1".to_string(),
1182            session_id: None,
1183            last_revision: None,
1184            resume_token: None,
1185            tenant_id: Some("tenant-a".to_string()),
1186            trace_id: None,
1187            span_id: None,
1188            parent_span_id: None,
1189            correlation_id: None,
1190            request_id: None,
1191        });
1192        assert_eq!(session.tenant_id(), Some("tenant-a"));
1193
1194        let _ = session.handle_client_message(ClientMessage::Event {
1195            event: "inc".to_string(),
1196            target: None,
1197            value: json!({"tenant_id": "tenant-b"}),
1198            metadata: Default::default(),
1199        });
1200        assert_eq!(session.tenant_id(), Some("tenant-b"));
1201    }
1202
1203    #[test]
1204    fn unknown_events_do_not_panic() {
1205        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1206        session.mount().unwrap();
1207
1208        let messages = session.handle_client_message(ClientMessage::Event {
1209            event: "unknown".to_string(),
1210            target: None,
1211            value: Value::Null,
1212            metadata: Default::default(),
1213        });
1214
1215        assert_eq!(
1216            messages,
1217            vec![ServerMessage::Patch {
1218                target: "root".to_string(),
1219                html: "<p>0</p>".to_string(),
1220                revision: 1,
1221            }]
1222        );
1223    }
1224
1225    #[derive(Default)]
1226    struct FailingEvent {
1227        count: i64,
1228    }
1229
1230    impl LiveView for FailingEvent {
1231        fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1232            match event.name.as_str() {
1233                "fail" => {
1234                    ctx.redirect("/should-not-leak");
1235                    Err(ShellyError::Event("boom".to_string()))
1236                }
1237                "inc" => {
1238                    self.count += 1;
1239                    Ok(())
1240                }
1241                _ => Ok(()),
1242            }
1243        }
1244
1245        fn render(&self) -> Html {
1246            Html::new(format!("<p>{}</p>", self.count))
1247        }
1248    }
1249
1250    #[test]
1251    fn event_errors_do_not_increment_revision_or_leak_pushes() {
1252        let mut session = LiveSession::new(Box::<FailingEvent>::default(), "root");
1253        session.mount().unwrap();
1254
1255        let failure = session.handle_client_message(ClientMessage::Event {
1256            event: "fail".to_string(),
1257            target: None,
1258            value: Value::Null,
1259            metadata: Default::default(),
1260        });
1261
1262        assert_eq!(
1263            failure,
1264            vec![ServerMessage::Error {
1265                message: "event error: boom".to_string(),
1266                code: Some("event_failed".to_string()),
1267            }]
1268        );
1269        assert_eq!(session.revision(), 0);
1270
1271        let success = session.handle_client_message(ClientMessage::Event {
1272            event: "inc".to_string(),
1273            target: None,
1274            value: Value::Null,
1275            metadata: Default::default(),
1276        });
1277
1278        assert_eq!(
1279            success,
1280            vec![ServerMessage::Patch {
1281                target: "root".to_string(),
1282                html: "<p>1</p>".to_string(),
1283                revision: 1,
1284            }]
1285        );
1286    }
1287
1288    #[derive(Default)]
1289    struct TemplateCounter {
1290        count: i64,
1291    }
1292
1293    impl LiveView for TemplateCounter {
1294        fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1295            if event.name == "inc" {
1296                self.count += 1;
1297            }
1298            Ok(())
1299        }
1300
1301        fn render(&self) -> Html {
1302            Template::new(vec!["<p>Count: ", "</p>"], vec![self.count.to_string()]).into()
1303        }
1304    }
1305
1306    #[test]
1307    fn template_updates_send_only_changed_dynamic_slots() {
1308        let mut session = LiveSession::new(Box::<TemplateCounter>::default(), "root");
1309        session.mount().unwrap();
1310
1311        assert_eq!(
1312            session.render_patch(),
1313            ServerMessage::Patch {
1314                target: "root".to_string(),
1315                html: r#"<p>Count: <span data-shelly-slot="0">0</span></p>"#.to_string(),
1316                revision: 1,
1317            }
1318        );
1319
1320        let update = session.handle_client_message(ClientMessage::Event {
1321            event: "inc".to_string(),
1322            target: None,
1323            value: Value::Null,
1324            metadata: Default::default(),
1325        });
1326
1327        assert_eq!(
1328            update,
1329            vec![ServerMessage::Diff {
1330                target: "root".to_string(),
1331                revision: 2,
1332                slots: vec![DynamicSlotPatch {
1333                    index: 0,
1334                    html: "1".to_string(),
1335                }],
1336            }]
1337        );
1338    }
1339
1340    #[test]
1341    fn plain_html_keeps_full_patch_fallback() {
1342        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1343        session.mount().unwrap();
1344        assert!(matches!(
1345            session.render_patch(),
1346            ServerMessage::Patch { .. }
1347        ));
1348
1349        let update = session.handle_client_message(ClientMessage::Event {
1350            event: "inc".to_string(),
1351            target: None,
1352            value: Value::Null,
1353            metadata: Default::default(),
1354        });
1355
1356        assert!(matches!(update.as_slice(), [ServerMessage::Patch { .. }]));
1357    }
1358
1359    #[derive(Default)]
1360    struct StreamList {
1361        next_id: usize,
1362    }
1363
1364    impl LiveView for StreamList {
1365        fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1366            match event.name.as_str() {
1367                "append" => {
1368                    self.next_id += 1;
1369                    let id = format!("item-{}", self.next_id);
1370                    ctx.stream_append(
1371                        "items",
1372                        id.clone(),
1373                        format!(r#"<li id="{id}">Item {}</li>"#, self.next_id),
1374                    );
1375                }
1376                "delete" => ctx.stream_delete("items", "item-1"),
1377                _ => {}
1378            }
1379            Ok(())
1380        }
1381
1382        fn render(&self) -> Html {
1383            Html::new(r#"<ul id="items"></ul>"#)
1384        }
1385    }
1386
1387    #[test]
1388    fn stream_events_send_only_stream_operations() {
1389        let mut session = LiveSession::new(Box::<StreamList>::default(), "root");
1390        session.mount().unwrap();
1391        session.render_patch();
1392
1393        let append = session.handle_client_message(ClientMessage::Event {
1394            event: "append".to_string(),
1395            target: None,
1396            value: Value::Null,
1397            metadata: Default::default(),
1398        });
1399
1400        assert_eq!(
1401            append,
1402            vec![ServerMessage::StreamInsert {
1403                target: "items".to_string(),
1404                id: "item-1".to_string(),
1405                html: r#"<li id="item-1">Item 1</li>"#.to_string(),
1406                at: crate::StreamPosition::Append,
1407            }]
1408        );
1409
1410        let delete = session.handle_client_message(ClientMessage::Event {
1411            event: "delete".to_string(),
1412            target: None,
1413            value: Value::Null,
1414            metadata: Default::default(),
1415        });
1416
1417        assert_eq!(
1418            delete,
1419            vec![ServerMessage::StreamDelete {
1420                target: "items".to_string(),
1421                id: "item-1".to_string(),
1422            }]
1423        );
1424    }
1425
1426    #[derive(Default)]
1427    struct StreamBatchList {
1428        next_id: usize,
1429    }
1430
1431    impl LiveView for StreamBatchList {
1432        fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1433            if event.name == "burst_append" {
1434                let mut items = Vec::new();
1435                for _ in 0..3 {
1436                    self.next_id += 1;
1437                    let id = format!("item-{}", self.next_id);
1438                    let html = format!(r#"<li id="{id}">Item {}</li>"#, self.next_id);
1439                    items.push((id, html));
1440                }
1441                ctx.stream_append_many("items", items);
1442            }
1443            Ok(())
1444        }
1445
1446        fn render(&self) -> Html {
1447            Html::new(r#"<ul id="items"></ul>"#)
1448        }
1449    }
1450
1451    #[test]
1452    fn stream_burst_events_send_batch_operations() {
1453        let mut session = LiveSession::new(Box::<StreamBatchList>::default(), "root");
1454        session.mount().unwrap();
1455        session.render_patch();
1456
1457        let messages = session.handle_client_message(ClientMessage::Event {
1458            event: "burst_append".to_string(),
1459            target: None,
1460            value: Value::Null,
1461            metadata: Default::default(),
1462        });
1463
1464        assert_eq!(messages.len(), 1);
1465        assert_eq!(
1466            messages[0],
1467            ServerMessage::StreamBatch {
1468                target: "items".to_string(),
1469                operations: vec![
1470                    StreamBatchOperation::Insert {
1471                        id: "item-1".to_string(),
1472                        html: r#"<li id="item-1">Item 1</li>"#.to_string(),
1473                        at: crate::StreamPosition::Append,
1474                    },
1475                    StreamBatchOperation::Insert {
1476                        id: "item-2".to_string(),
1477                        html: r#"<li id="item-2">Item 2</li>"#.to_string(),
1478                        at: crate::StreamPosition::Append,
1479                    },
1480                    StreamBatchOperation::Insert {
1481                        id: "item-3".to_string(),
1482                        html: r#"<li id="item-3">Item 3</li>"#.to_string(),
1483                        at: crate::StreamPosition::Append,
1484                    },
1485                ],
1486            }
1487        );
1488    }
1489
1490    #[derive(Default)]
1491    struct RuntimeScheduler;
1492
1493    impl LiveView for RuntimeScheduler {
1494        fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1495            ctx.schedule_interval("heartbeat", 1000, "tick", Value::Null);
1496            Ok(())
1497        }
1498
1499        fn render(&self) -> Html {
1500            Html::new("<p>scheduler</p>")
1501        }
1502    }
1503
1504    #[test]
1505    fn mount_can_queue_runtime_commands() {
1506        let mut session = LiveSession::new(Box::<RuntimeScheduler>::default(), "root");
1507        session.mount().unwrap();
1508
1509        assert_eq!(
1510            session.drain_runtime_commands(),
1511            vec![RuntimeCommand::ScheduleInterval {
1512                id: "heartbeat".to_string(),
1513                every_ms: 1000,
1514                dispatch: crate::RuntimeEvent::new("tick", Value::Null),
1515            }]
1516        );
1517    }
1518
1519    #[derive(Default)]
1520    struct CadencedCounter {
1521        count: i64,
1522    }
1523
1524    impl LiveView for CadencedCounter {
1525        fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1526            ctx.set_render_cadence_ms(25);
1527            Ok(())
1528        }
1529
1530        fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1531            if event.name == "inc" {
1532                self.count += 1;
1533            }
1534            Ok(())
1535        }
1536
1537        fn render(&self) -> Html {
1538            Html::new(format!("<p>{}</p>", self.count))
1539        }
1540    }
1541
1542    #[test]
1543    fn render_cadence_coalesces_bursty_root_patches() {
1544        let mut session = LiveSession::new(Box::<CadencedCounter>::default(), "root");
1545        session.mount().unwrap();
1546
1547        let first = session.handle_client_message(ClientMessage::Event {
1548            event: "inc".to_string(),
1549            target: None,
1550            value: Value::Null,
1551            metadata: Default::default(),
1552        });
1553        assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
1554
1555        let second = session.handle_client_message(ClientMessage::Event {
1556            event: "inc".to_string(),
1557            target: None,
1558            value: Value::Null,
1559            metadata: Default::default(),
1560        });
1561        assert!(second.is_empty());
1562
1563        let runtime = session.drain_runtime_commands();
1564        assert!(runtime.iter().any(|command| matches!(
1565            command,
1566            RuntimeCommand::ScheduleOnce { dispatch, .. }
1567            if dispatch.event == INTERNAL_RENDER_FLUSH_EVENT
1568        )));
1569
1570        thread::sleep(Duration::from_millis(30));
1571        let flush = session.handle_client_message(ClientMessage::Event {
1572            event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
1573            target: None,
1574            value: Value::Null,
1575            metadata: Default::default(),
1576        });
1577        assert!(matches!(
1578            flush.as_slice(),
1579            [ServerMessage::Patch { html, revision, .. }]
1580            if html == "<p>2</p>" && *revision == 2
1581        ));
1582    }
1583
1584    #[derive(Default)]
1585    struct CadenceOverrideCounter {
1586        count: i64,
1587    }
1588
1589    impl LiveView for CadenceOverrideCounter {
1590        fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1591            ctx.set_render_cadence_ms(100);
1592            Ok(())
1593        }
1594
1595        fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1596            match event.name.as_str() {
1597                "inc" => self.count += 1,
1598                "disable_cadence" => {
1599                    ctx.clear_render_cadence_override();
1600                    self.count += 1;
1601                }
1602                _ => {}
1603            }
1604            Ok(())
1605        }
1606
1607        fn render(&self) -> Html {
1608            Html::new(format!("<p>{}</p>", self.count))
1609        }
1610    }
1611
1612    #[test]
1613    fn per_view_override_can_disable_render_cadence() {
1614        let mut session = LiveSession::new(Box::<CadenceOverrideCounter>::default(), "root");
1615        session.mount().unwrap();
1616
1617        let first = session.handle_client_message(ClientMessage::Event {
1618            event: "inc".to_string(),
1619            target: None,
1620            value: Value::Null,
1621            metadata: Default::default(),
1622        });
1623        assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
1624
1625        let coalesced = session.handle_client_message(ClientMessage::Event {
1626            event: "inc".to_string(),
1627            target: None,
1628            value: Value::Null,
1629            metadata: Default::default(),
1630        });
1631        assert!(coalesced.is_empty());
1632
1633        let disabled = session.handle_client_message(ClientMessage::Event {
1634            event: "disable_cadence".to_string(),
1635            target: None,
1636            value: Value::Null,
1637            metadata: Default::default(),
1638        });
1639        assert!(matches!(
1640            disabled.as_slice(),
1641            [ServerMessage::Patch { html, revision, .. }]
1642            if html == "<p>3</p>" && *revision == 2
1643        ));
1644    }
1645
1646    struct FailingTelemetrySink;
1647
1648    impl TelemetrySink for FailingTelemetrySink {
1649        fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
1650            Err("telemetry downstream unavailable".to_string())
1651        }
1652    }
1653
1654    #[test]
1655    fn telemetry_sink_failures_do_not_break_runtime_flow() {
1656        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1657        session.set_telemetry_sink(Arc::new(FailingTelemetrySink));
1658        session.mount().unwrap();
1659
1660        let messages = session.handle_client_message(ClientMessage::Event {
1661            event: "inc".to_string(),
1662            target: None,
1663            value: Value::Null,
1664            metadata: Default::default(),
1665        });
1666
1667        assert!(matches!(messages.as_slice(), [ServerMessage::Patch { .. }]));
1668    }
1669
1670    struct TodoItem {
1671        id: ComponentId,
1672        title: String,
1673        done: bool,
1674    }
1675
1676    impl TodoItem {
1677        fn new(id: &str, title: &str) -> Self {
1678            Self {
1679                id: ComponentId::new(id),
1680                title: title.to_string(),
1681                done: false,
1682            }
1683        }
1684    }
1685
1686    impl LiveComponent for TodoItem {
1687        fn id(&self) -> ComponentId {
1688            self.id.clone()
1689        }
1690
1691        fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1692            if event.name == "toggle" {
1693                self.done = !self.done;
1694            }
1695            Ok(())
1696        }
1697
1698        fn render(&self) -> Html {
1699            Html::new(format!(
1700                r#"<li id="{id}" data-done="{done}"><span>{title}</span><button shelly-click="toggle" shelly-target="{id}">Toggle</button></li>"#,
1701                id = self.id,
1702                done = self.done,
1703                title = self.title,
1704            ))
1705        }
1706    }
1707
1708    struct TodoList {
1709        items: Vec<TodoItem>,
1710    }
1711
1712    impl Default for TodoList {
1713        fn default() -> Self {
1714            Self {
1715                items: vec![
1716                    TodoItem::new("todo-1", "Write docs"),
1717                    TodoItem::new("todo-2", "Ship components"),
1718                ],
1719            }
1720        }
1721    }
1722
1723    impl LiveView for TodoList {
1724        fn handle_component_event(
1725            &mut self,
1726            target: &str,
1727            event: Event,
1728            ctx: &mut Context,
1729        ) -> LiveResult<Option<ComponentRender>> {
1730            let Some(item) = self
1731                .items
1732                .iter_mut()
1733                .find(|item| item.id().as_str() == target)
1734            else {
1735                return Ok(None);
1736            };
1737
1738            item.handle_event(event, ctx)?;
1739            Ok(Some(ComponentRender::new(item.id(), item.render())))
1740        }
1741
1742        fn render(&self) -> Html {
1743            let items = self
1744                .items
1745                .iter()
1746                .map(|item| item.render().into_string())
1747                .collect::<String>();
1748            Html::new(format!(r#"<ul>{items}</ul>"#))
1749        }
1750    }
1751
1752    #[test]
1753    fn scoped_component_event_patches_only_target_component() {
1754        let mut session = LiveSession::new(Box::<TodoList>::default(), "root");
1755        session.mount().unwrap();
1756
1757        let initial = session.render_patch();
1758        assert!(matches!(
1759            initial,
1760            ServerMessage::Patch { ref html, .. } if html.contains("todo-1")
1761        ));
1762
1763        let update = session.handle_client_message(ClientMessage::Event {
1764            event: "toggle".to_string(),
1765            target: Some("todo-2".to_string()),
1766            value: Value::Null,
1767            metadata: Default::default(),
1768        });
1769
1770        assert_eq!(update.len(), 1);
1771        assert_eq!(
1772            update[0],
1773            ServerMessage::Patch {
1774                target: "todo-2".to_string(),
1775                html: r#"<li id="todo-2" data-done="true"><span>Ship components</span><button shelly-click="toggle" shelly-target="todo-2">Toggle</button></li>"#.to_string(),
1776                revision: 2,
1777            }
1778        );
1779    }
1780
1781    #[derive(Default)]
1782    struct ChildCounter {
1783        count: i64,
1784    }
1785
1786    impl LiveView for ChildCounter {
1787        fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1788            ctx.schedule_once_to("child-tick", 10, "child-a", "tick", Value::Null);
1789            Ok(())
1790        }
1791
1792        fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1793            match event.name.as_str() {
1794                "inc" => self.count += 1,
1795                "fail" => return Err(ShellyError::Event("child boom".to_string())),
1796                _ => {}
1797            }
1798            Ok(())
1799        }
1800
1801        fn render(&self) -> Html {
1802            Html::new(format!(
1803                r#"<section id="child-a">child:{}</section>"#,
1804                self.count
1805            ))
1806        }
1807    }
1808
1809    #[test]
1810    fn nested_live_view_mounts_and_handles_scoped_events_without_parent_remount() {
1811        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1812        session.mount().unwrap();
1813
1814        let child_patch = session
1815            .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1816            .expect("child should mount");
1817        assert_eq!(
1818            child_patch,
1819            ServerMessage::Patch {
1820                target: "child-a".to_string(),
1821                html: r#"<section id="child-a">child:0</section>"#.to_string(),
1822                revision: 1,
1823            }
1824        );
1825        assert_eq!(
1826            session.child_state("child-a"),
1827            Some(NestedLiveViewState::Mounted)
1828        );
1829        assert_eq!(
1830            session.drain_runtime_commands(),
1831            vec![RuntimeCommand::ScheduleOnce {
1832                id: "child-tick".to_string(),
1833                delay_ms: 10,
1834                dispatch: crate::RuntimeEvent::with_target("child-a", "tick", Value::Null),
1835            }]
1836        );
1837
1838        let update = session.handle_client_message(ClientMessage::Event {
1839            event: "inc".to_string(),
1840            target: Some("child-a".to_string()),
1841            value: Value::Null,
1842            metadata: Default::default(),
1843        });
1844
1845        assert_eq!(
1846            update,
1847            vec![ServerMessage::Patch {
1848                target: "child-a".to_string(),
1849                html: r#"<section id="child-a">child:1</section>"#.to_string(),
1850                revision: 2,
1851            }]
1852        );
1853
1854        let parent_snapshot = session.render_snapshot_patch();
1855        assert!(matches!(
1856            parent_snapshot,
1857            ServerMessage::Patch { html, revision, .. }
1858            if html == "<p>0</p>" && revision == 2
1859        ));
1860    }
1861
1862    #[test]
1863    fn nested_live_view_lifecycle_suspend_resume_and_terminate_are_isolated() {
1864        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1865        session.mount().unwrap();
1866        let _ = session
1867            .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1868            .expect("child should mount");
1869
1870        session.suspend_child("child-a").expect("suspend child");
1871        assert_eq!(
1872            session.child_state("child-a"),
1873            Some(NestedLiveViewState::Suspended)
1874        );
1875        let blocked = session.handle_client_message(ClientMessage::Event {
1876            event: "inc".to_string(),
1877            target: Some("child-a".to_string()),
1878            value: Value::Null,
1879            metadata: Default::default(),
1880        });
1881        assert!(matches!(
1882            blocked.as_slice(),
1883            [ServerMessage::Error { code: Some(code), .. }]
1884            if code == "nested_live_view_unavailable"
1885        ));
1886        assert_eq!(session.revision(), 1);
1887
1888        let resumed = session.resume_child("child-a").expect("resume child");
1889        assert!(matches!(
1890            resumed,
1891            ServerMessage::Patch { target, revision, .. }
1892            if target == "child-a" && revision == 2
1893        ));
1894
1895        let terminated = session.terminate_child("child-a").expect("terminate child");
1896        assert_eq!(
1897            terminated,
1898            ServerMessage::Patch {
1899                target: "child-a".to_string(),
1900                html: String::new(),
1901                revision: 3,
1902            }
1903        );
1904        assert_eq!(
1905            session.child_state("child-a"),
1906            Some(NestedLiveViewState::Terminated)
1907        );
1908    }
1909
1910    #[test]
1911    fn nested_live_view_event_failure_does_not_corrupt_parent_or_siblings() {
1912        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1913        session.mount().unwrap();
1914        let _ = session
1915            .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1916            .expect("child a should mount");
1917        let _ = session
1918            .mount_child("child-b", Box::<ChildCounter>::default(), "child-b")
1919            .expect("child b should mount");
1920
1921        let failure = session.handle_client_message(ClientMessage::Event {
1922            event: "fail".to_string(),
1923            target: Some("child-a".to_string()),
1924            value: Value::Null,
1925            metadata: Default::default(),
1926        });
1927        assert!(matches!(
1928            failure.as_slice(),
1929            [ServerMessage::Error { code: Some(code), .. }]
1930            if code == "nested_event_failed"
1931        ));
1932        assert_eq!(session.revision(), 2);
1933
1934        let sibling = session.handle_client_message(ClientMessage::Event {
1935            event: "inc".to_string(),
1936            target: Some("child-b".to_string()),
1937            value: Value::Null,
1938            metadata: Default::default(),
1939        });
1940        assert!(matches!(
1941            sibling.as_slice(),
1942            [ServerMessage::Patch { target, revision, .. }]
1943            if target == "child-b" && *revision == 3
1944        ));
1945
1946        let parent = session.handle_client_message(ClientMessage::Event {
1947            event: "inc".to_string(),
1948            target: None,
1949            value: Value::Null,
1950            metadata: Default::default(),
1951        });
1952        assert!(matches!(
1953            parent.as_slice(),
1954            [ServerMessage::Patch { target, html, revision }]
1955            if target == "root" && html == "<p>1</p>" && *revision == 4
1956        ));
1957    }
1958
1959    #[test]
1960    fn session_emits_mount_and_event_telemetry() {
1961        let telemetry = Arc::new(MemoryTelemetrySink::new());
1962        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1963        session.set_telemetry_sink(telemetry.clone());
1964        session.mount().unwrap();
1965        let _ = session.handle_client_message(ClientMessage::Event {
1966            event: "inc".to_string(),
1967            target: None,
1968            value: Value::Null,
1969            metadata: Default::default(),
1970        });
1971
1972        let events = telemetry.events();
1973        assert!(events
1974            .iter()
1975            .any(|event| event.kind == TelemetryEventKind::Mount));
1976        assert!(events
1977            .iter()
1978            .any(|event| event.kind == TelemetryEventKind::HandleEvent));
1979        assert!(events
1980            .iter()
1981            .any(|event| event.kind == TelemetryEventKind::Patch));
1982    }
1983
1984    #[test]
1985    fn session_public_helpers_cover_route_hello_snapshot_and_trace_paths() {
1986        let mut route_params = BTreeMap::new();
1987        route_params.insert("team".to_string(), "core".to_string());
1988        let mut session = LiveSession::new_with_route(
1989            Box::<Counter>::default(),
1990            "root",
1991            "/teams/core",
1992            route_params.clone(),
1993        );
1994
1995        assert_eq!(session.route_path(), "/teams/core");
1996        assert_eq!(session.route_params(), &route_params);
1997        assert_eq!(session.render_html().as_str(), "<p>0</p>");
1998
1999        session.set_default_render_cadence_ms(10);
2000        let hello = session.hello();
2001        assert!(matches!(
2002            hello,
2003            ServerMessage::Hello { ref target, revision, .. } if target == "root" && revision == 0
2004        ));
2005
2006        let mut patched_params = BTreeMap::new();
2007        patched_params.insert("team".to_string(), "platform".to_string());
2008        session
2009            .patch_route("/teams/platform", patched_params.clone())
2010            .unwrap();
2011        assert_eq!(session.route_path(), "/teams/platform");
2012        assert_eq!(session.route_params(), &patched_params);
2013
2014        session.enable_trace_capture(TraceRedactionPolicy::none());
2015        assert!(session.trace_capture_enabled());
2016        let _ = session.handle_client_message(ClientMessage::Event {
2017            event: "inc".to_string(),
2018            target: None,
2019            value: Value::Null,
2020            metadata: Default::default(),
2021        });
2022        assert!(session.trace_artifact().is_some());
2023
2024        let snapshot = session.render_snapshot_patch();
2025        assert!(matches!(
2026            snapshot,
2027            ServerMessage::Patch { ref html, revision, .. } if html == "<p>1</p>" && revision == session.revision()
2028        ));
2029
2030        let artifact = session.take_trace_artifact();
2031        assert!(artifact.is_some());
2032        assert!(!session.trace_capture_enabled());
2033        session.disable_trace_capture();
2034        assert!(session.trace_artifact().is_none());
2035    }
2036
2037    #[derive(Default)]
2038    struct MountFailureView;
2039
2040    impl LiveView for MountFailureView {
2041        fn mount(&mut self, ctx: &mut Context) -> LiveResult {
2042            ctx.redirect("/mount-failure");
2043            ctx.subscribe("alerts");
2044            ctx.schedule_once("retry", 10, "retry", Value::Null);
2045            Err(ShellyError::Event("mount failed".to_string()))
2046        }
2047
2048        fn render(&self) -> Html {
2049            Html::new("<p>mount-failure</p>")
2050        }
2051    }
2052
2053    #[test]
2054    fn mount_failure_drains_side_effects_and_keeps_session_safe() {
2055        let mut session = LiveSession::new(Box::<MountFailureView>::default(), "root");
2056        let err = session.mount().unwrap_err().to_string();
2057        assert!(err.contains("mount failed"));
2058        assert!(session.drain_pubsub_commands().is_empty());
2059        assert!(session.drain_runtime_commands().is_empty());
2060        assert_eq!(session.revision(), 0);
2061    }
2062
2063    #[derive(Default)]
2064    struct ComponentErrorView;
2065
2066    impl LiveView for ComponentErrorView {
2067        fn handle_component_event(
2068            &mut self,
2069            _target: &str,
2070            _event: Event,
2071            _ctx: &mut Context,
2072        ) -> LiveResult<Option<ComponentRender>> {
2073            Err(ShellyError::Event("component handler failed".to_string()))
2074        }
2075
2076        fn render(&self) -> Html {
2077            Html::new("<p>component</p>")
2078        }
2079    }
2080
2081    #[test]
2082    fn protocol_paths_cover_navigation_upload_and_component_error_branches() {
2083        let mut session = LiveSession::new(Box::<Counter>::default(), "root");
2084        session.mount().unwrap();
2085
2086        assert_eq!(
2087            session.handle_client_message(ClientMessage::PatchUrl {
2088                to: "/inbox".to_string()
2089            }),
2090            vec![ServerMessage::PatchUrl {
2091                to: "/inbox".to_string()
2092            }]
2093        );
2094        assert_eq!(
2095            session.handle_client_message(ClientMessage::Navigate {
2096                to: "/settings".to_string()
2097            }),
2098            vec![ServerMessage::Navigate {
2099                to: "/settings".to_string()
2100            }]
2101        );
2102
2103        for message in [
2104            ClientMessage::UploadStart {
2105                upload_id: "up-1".to_string(),
2106                event: "upload".to_string(),
2107                target: None,
2108                name: "avatar.png".to_string(),
2109                size: 12,
2110                content_type: Some("image/png".to_string()),
2111            },
2112            ClientMessage::UploadChunk {
2113                upload_id: "up-1".to_string(),
2114                offset: 0,
2115                data: "aaa".to_string(),
2116            },
2117            ClientMessage::UploadComplete {
2118                upload_id: "up-1".to_string(),
2119            },
2120        ] {
2121            let response = session.handle_client_message(message);
2122            assert!(matches!(
2123                response.as_slice(),
2124                [ServerMessage::Error { code: Some(code), .. }] if code == "unsupported_upload_transport"
2125            ));
2126        }
2127
2128        let _ = session.handle_client_message(ClientMessage::Connect {
2129            protocol: "shelly/1".to_string(),
2130            session_id: None,
2131            last_revision: None,
2132            resume_token: None,
2133            tenant_id: Some("   ".to_string()),
2134            trace_id: None,
2135            span_id: None,
2136            parent_span_id: None,
2137            correlation_id: None,
2138            request_id: None,
2139        });
2140        assert_eq!(session.tenant_id(), None);
2141
2142        let mut component_session = LiveSession::new(Box::<ComponentErrorView>::default(), "root");
2143        component_session.mount().unwrap();
2144        let component_failure = component_session.handle_client_message(ClientMessage::Event {
2145            event: "toggle".to_string(),
2146            target: Some("todo-1".to_string()),
2147            value: Value::Null,
2148            metadata: Default::default(),
2149        });
2150        assert!(matches!(
2151            component_failure.as_slice(),
2152            [ServerMessage::Error { code: Some(code), .. }] if code == "event_failed"
2153        ));
2154    }
2155
2156    #[derive(Default)]
2157    struct StreamInsertDeleteBatchView;
2158
2159    impl LiveView for StreamInsertDeleteBatchView {
2160        fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
2161            if event.name == "batch" {
2162                ctx.stream_batch(
2163                    "items",
2164                    vec![
2165                        StreamBatchOperation::Insert {
2166                            id: "item-1".to_string(),
2167                            html: "<li>Item 1</li>".to_string(),
2168                            at: crate::StreamPosition::Append,
2169                        },
2170                        StreamBatchOperation::Delete {
2171                            id: "item-0".to_string(),
2172                        },
2173                    ],
2174                );
2175            }
2176            Ok(())
2177        }
2178
2179        fn render(&self) -> Html {
2180            Html::new("<ul id=\"items\"></ul>")
2181        }
2182    }
2183
2184    #[test]
2185    fn stream_batch_delete_telemetry_and_empty_flush_paths_are_exercised() {
2186        let telemetry = Arc::new(MemoryTelemetrySink::new());
2187        let mut session = LiveSession::new(Box::<StreamInsertDeleteBatchView>::default(), "root");
2188        session.set_telemetry_sink(telemetry.clone());
2189        session.mount().unwrap();
2190        let _ = session.render_patch();
2191
2192        let batch = session.handle_client_message(ClientMessage::Event {
2193            event: "batch".to_string(),
2194            target: None,
2195            value: Value::Null,
2196            metadata: Default::default(),
2197        });
2198        assert!(matches!(
2199            batch.as_slice(),
2200            [ServerMessage::StreamBatch { .. }]
2201        ));
2202
2203        let empty_flush = session.handle_client_message(ClientMessage::Event {
2204            event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
2205            target: None,
2206            value: Value::Null,
2207            metadata: Default::default(),
2208        });
2209        assert!(empty_flush.is_empty());
2210
2211        let telemetry_events = telemetry.events();
2212        assert!(telemetry_events.iter().any(|event| {
2213            event.kind == TelemetryEventKind::StreamInsert && event.count == Some(1)
2214        }));
2215        assert!(telemetry_events.iter().any(|event| {
2216            event.kind == TelemetryEventKind::StreamDelete && event.count == Some(1)
2217        }));
2218    }
2219}