1use crate::nested::NestedLiveViewInstance;
2use crate::{
3 ClientMessage, ComponentRender, Context, DynamicSlotPatch, Event, Html, LiveResult, LiveView,
4 NestedLiveViewId, NestedLiveViewSnapshot, NestedLiveViewState, NoopTelemetrySink,
5 ServerMessage, SessionId, SessionReplayMetadata, SessionReplayTrace, SessionTraceRecorder,
6 ShellyError, TelemetryEvent, TelemetryEventKind, TelemetrySink, TraceRedactionPolicy,
7 PROTOCOL_VERSION_V1,
8};
9use serde_json::{json, Value};
10use std::{
11 collections::BTreeMap,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16const PROTOCOL_VERSION: &str = PROTOCOL_VERSION_V1;
17const INTERNAL_RENDER_FLUSH_ID: &str = "__shelly_internal_render_flush";
18pub const INTERNAL_RENDER_FLUSH_EVENT: &str = "__shelly_internal_render_flush";
19
20pub struct LiveSession {
22 view: Box<dyn LiveView>,
23 ctx: Context,
24 revision: u64,
25 last_render: Option<Html>,
26 telemetry: Arc<dyn TelemetrySink>,
27 default_render_cadence_ms: u64,
28 last_render_sent_at: Option<Instant>,
29 coalesced_render_count: usize,
30 coalesced_render_since: Option<Instant>,
31 render_flush_deadline: Option<Instant>,
32 trace_recorder: Option<SessionTraceRecorder>,
33 children: BTreeMap<NestedLiveViewId, NestedLiveViewInstance>,
34}
35
36impl LiveSession {
37 pub fn new(view: Box<dyn LiveView>, target_dom_id: impl Into<String>) -> Self {
39 Self::new_with_session_id(view, SessionId::new().to_string(), target_dom_id)
40 }
41
42 pub fn new_with_session_id(
44 view: Box<dyn LiveView>,
45 session_id: impl Into<String>,
46 target_dom_id: impl Into<String>,
47 ) -> Self {
48 Self::new_with_route_and_session_id(view, session_id, target_dom_id, "/", BTreeMap::new())
49 }
50
51 pub fn new_with_route(
53 view: Box<dyn LiveView>,
54 target_dom_id: impl Into<String>,
55 route_path: impl Into<String>,
56 route_params: BTreeMap<String, String>,
57 ) -> Self {
58 Self::new_with_route_and_session_id(
59 view,
60 SessionId::new().to_string(),
61 target_dom_id,
62 route_path,
63 route_params,
64 )
65 }
66
67 pub fn new_with_route_and_session_id(
69 view: Box<dyn LiveView>,
70 session_id: impl Into<String>,
71 target_dom_id: impl Into<String>,
72 route_path: impl Into<String>,
73 route_params: BTreeMap<String, String>,
74 ) -> Self {
75 let mut ctx =
76 Context::new_with_session_id(SessionId::from_string(session_id.into()), target_dom_id);
77 ctx.set_route(route_path, route_params);
78 Self {
79 view,
80 ctx,
81 revision: 0,
82 last_render: None,
83 telemetry: Arc::new(NoopTelemetrySink),
84 default_render_cadence_ms: 0,
85 last_render_sent_at: None,
86 coalesced_render_count: 0,
87 coalesced_render_since: None,
88 render_flush_deadline: None,
89 trace_recorder: None,
90 children: BTreeMap::new(),
91 }
92 }
93
94 pub fn set_telemetry_sink(&mut self, telemetry: Arc<dyn TelemetrySink>) {
96 self.telemetry = telemetry;
97 }
98
99 pub fn set_default_render_cadence_ms(&mut self, cadence_ms: u64) {
103 self.default_render_cadence_ms = cadence_ms;
104 }
105
106 pub fn mount(&mut self) -> LiveResult {
108 let started = Instant::now();
109 self.ctx.set_connected(true);
110 let result = self
111 .view
112 .mount(&mut self.ctx)
113 .and_then(|_| self.view.handle_params(&mut self.ctx));
114 if result.is_err() {
115 self.ctx.set_connected(false);
116 let _ = self.ctx.drain_pushes();
117 let _ = self.ctx.drain_pubsub_commands();
118 let _ = self.ctx.drain_runtime_commands();
119 }
120 self.emit_telemetry(
121 self.telemetry_event(TelemetryEventKind::Mount)
122 .with_ok(result.is_ok())
123 .with_latency_ms(started.elapsed().as_millis() as u64),
124 );
125 result
126 }
127
128 pub fn drain_pubsub_commands(&mut self) -> Vec<crate::PubSubCommand> {
130 let mut commands = self.ctx.drain_pubsub_commands();
131 for child in self.children.values_mut() {
132 commands.extend(child.ctx.drain_pubsub_commands());
133 }
134 commands
135 }
136
137 pub fn drain_runtime_commands(&mut self) -> Vec<crate::RuntimeCommand> {
139 let mut commands = self.ctx.drain_runtime_commands();
140 for child in self.children.values_mut() {
141 commands.extend(child.ctx.drain_runtime_commands());
142 }
143 commands
144 }
145
146 pub fn session_id(&self) -> &str {
148 self.ctx.session_id().as_str()
149 }
150
151 pub fn route_path(&self) -> &str {
153 self.ctx.route_path()
154 }
155
156 pub fn route_params(&self) -> &BTreeMap<String, String> {
158 self.ctx.route_params()
159 }
160
161 pub fn tenant_id(&self) -> Option<&str> {
163 self.ctx.tenant_id()
164 }
165
166 pub fn set_tenant_id(&mut self, tenant_id: impl Into<String>) {
168 self.ctx.set_tenant_id(tenant_id);
169 }
170
171 pub fn set_tenant_id_optional(&mut self, tenant_id: Option<String>) {
173 self.ctx.set_tenant_id_optional(tenant_id);
174 }
175
176 pub fn revision(&self) -> u64 {
178 self.revision
179 }
180
181 pub fn child_snapshots(&self) -> Vec<NestedLiveViewSnapshot> {
183 self.children
184 .iter()
185 .map(|(id, child)| child.snapshot(id.clone()))
186 .collect()
187 }
188
189 pub fn child_state(&self, id: impl AsRef<str>) -> Option<NestedLiveViewState> {
191 self.children
192 .get(&NestedLiveViewId::new(id.as_ref()))
193 .map(|child| child.state)
194 }
195
196 pub fn mount_child(
198 &mut self,
199 id: impl Into<NestedLiveViewId>,
200 view: Box<dyn LiveView>,
201 target_dom_id: impl Into<String>,
202 ) -> LiveResult<ServerMessage> {
203 let route_path = self.route_path().to_string();
204 let route_params = self.route_params().clone();
205 self.mount_child_with_route(id, view, target_dom_id, route_path, route_params)
206 }
207
208 pub fn mount_child_with_route(
210 &mut self,
211 id: impl Into<NestedLiveViewId>,
212 view: Box<dyn LiveView>,
213 target_dom_id: impl Into<String>,
214 route_path: impl Into<String>,
215 route_params: BTreeMap<String, String>,
216 ) -> LiveResult<ServerMessage> {
217 let id = id.into();
218 let target_dom_id = target_dom_id.into();
219 if self.children.contains_key(&id) {
220 return Err(ShellyError::InvalidMessage(format!(
221 "nested LiveView `{id}` is already mounted"
222 )));
223 }
224
225 let mut ctx = Context::new_with_session_id(
226 SessionId::from_string(self.session_id().to_string()),
227 target_dom_id,
228 );
229 ctx.set_connected(true);
230 ctx.set_tenant_id_optional(self.tenant_id().map(ToString::to_string));
231 ctx.set_route(route_path, route_params);
232
233 let mut child = NestedLiveViewInstance {
234 view,
235 ctx,
236 state: NestedLiveViewState::Mounted,
237 last_render: None,
238 };
239
240 child.view.mount(&mut child.ctx)?;
241 child.view.handle_params(&mut child.ctx)?;
242
243 self.children.insert(id.clone(), child);
244 let patch = self.render_child_update_by_id(&id)?;
245 self.emit_child_lifecycle_telemetry(&id, "mount", true);
246 Ok(patch)
247 }
248
249 pub fn update_child_params(
251 &mut self,
252 id: impl AsRef<str>,
253 route_path: impl Into<String>,
254 route_params: BTreeMap<String, String>,
255 ) -> LiveResult<ServerMessage> {
256 let id = NestedLiveViewId::new(id.as_ref());
257 {
258 let child = self.active_child_mut(&id)?;
259 child.ctx.set_route(route_path, route_params);
260 child.view.handle_params(&mut child.ctx)?;
261 }
262 self.emit_child_lifecycle_telemetry(&id, "params_update", true);
263 self.render_child_update_by_id(&id)
264 }
265
266 pub fn suspend_child(&mut self, id: impl AsRef<str>) -> LiveResult {
268 let id = NestedLiveViewId::new(id.as_ref());
269 let child = self.child_mut(&id)?;
270 if child.state == NestedLiveViewState::Terminated {
271 return Err(ShellyError::InvalidMessage(format!(
272 "nested LiveView `{id}` is terminated"
273 )));
274 }
275 child.state = NestedLiveViewState::Suspended;
276 child.ctx.set_connected(false);
277 self.emit_child_lifecycle_telemetry(&id, "suspend", true);
278 Ok(())
279 }
280
281 pub fn resume_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
283 let id = NestedLiveViewId::new(id.as_ref());
284 {
285 let child = self.child_mut(&id)?;
286 if child.state == NestedLiveViewState::Terminated {
287 return Err(ShellyError::InvalidMessage(format!(
288 "nested LiveView `{id}` is terminated"
289 )));
290 }
291 child.state = NestedLiveViewState::Mounted;
292 child.ctx.set_connected(true);
293 }
294 self.emit_child_lifecycle_telemetry(&id, "resume", true);
295 self.render_child_update_by_id(&id)
296 }
297
298 pub fn terminate_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
300 let id = NestedLiveViewId::new(id.as_ref());
301 let target = {
302 let child = self.child_mut(&id)?;
303 child.state = NestedLiveViewState::Terminated;
304 child.ctx.set_connected(false);
305 child.ctx.target_dom_id().to_string()
306 };
307 self.revision += 1;
308 let message = ServerMessage::Patch {
309 target,
310 html: String::new(),
311 revision: self.revision,
312 };
313 self.emit_child_lifecycle_telemetry(&id, "terminate", true);
314 self.emit_telemetry(
315 self.telemetry_event(TelemetryEventKind::Patch)
316 .with_bytes(0)
317 .with_count(1)
318 .with_attribute("nested_live_view".to_string(), Value::Bool(true))
319 .with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
320 );
321 Ok(message)
322 }
323
324 pub fn enable_trace_capture(&mut self, policy: TraceRedactionPolicy) {
326 self.trace_recorder = Some(SessionTraceRecorder::new(
327 SessionReplayMetadata {
328 protocol: PROTOCOL_VERSION.to_string(),
329 session_id: self.session_id().to_string(),
330 tenant_id: self.tenant_id().map(ToString::to_string),
331 target_id: self.ctx.target_dom_id().to_string(),
332 route_path: self.route_path().to_string(),
333 route_params: self.route_params().clone(),
334 },
335 policy,
336 ));
337 }
338
339 pub fn disable_trace_capture(&mut self) {
341 self.trace_recorder = None;
342 }
343
344 pub fn trace_capture_enabled(&self) -> bool {
346 self.trace_recorder.is_some()
347 }
348
349 pub fn trace_artifact(&self) -> Option<SessionReplayTrace> {
351 self.trace_recorder
352 .as_ref()
353 .map(SessionTraceRecorder::artifact)
354 }
355
356 pub fn take_trace_artifact(&mut self) -> Option<SessionReplayTrace> {
358 self.trace_recorder
359 .take()
360 .map(SessionTraceRecorder::into_artifact)
361 }
362
363 fn emit_telemetry(&self, event: TelemetryEvent) {
364 let _ = self.telemetry.emit(event);
365 }
366
367 fn telemetry_event(&self, kind: TelemetryEventKind) -> TelemetryEvent {
368 let mut event = TelemetryEvent::new(kind)
369 .with_session(self.session_id().to_string())
370 .with_route(self.ctx.route_path().to_string());
371 if let Some(tenant_id) = self.tenant_id() {
372 event = event.with_attribute(
373 "tenant_id".to_string(),
374 Value::String(tenant_id.to_string()),
375 );
376 }
377 event
378 }
379
380 fn effective_render_cadence_ms(&self) -> u64 {
381 self.ctx
382 .render_cadence_override_ms()
383 .unwrap_or(self.default_render_cadence_ms)
384 }
385
386 fn reset_coalesced_render_state(&mut self) {
387 self.coalesced_render_count = 0;
388 self.coalesced_render_since = None;
389 self.render_flush_deadline = None;
390 }
391
392 fn mark_render_sent_now(&mut self) {
393 if self.coalesced_render_count > 0 {
394 self.ctx.cancel_schedule_internal(INTERNAL_RENDER_FLUSH_ID);
395 }
396 self.last_render_sent_at = Some(Instant::now());
397 self.reset_coalesced_render_state();
398 }
399
400 fn emit_render_cadence_telemetry(
401 &self,
402 phase: &'static str,
403 cadence_ms: u64,
404 coalesced_count: usize,
405 latency_ms: Option<u64>,
406 ) {
407 self.emit_telemetry(
408 self.telemetry_event(TelemetryEventKind::RenderCadence)
409 .with_ok(true)
410 .with_count(coalesced_count.max(1))
411 .with_latency_ms(latency_ms.unwrap_or(0))
412 .with_attribute("render_phase".to_string(), Value::String(phase.to_string()))
413 .with_attribute(
414 "render_cadence_ms".to_string(),
415 Value::Number(serde_json::Number::from(cadence_ms)),
416 )
417 .with_attribute(
418 "coalesced_count".to_string(),
419 Value::Number(serde_json::Number::from(coalesced_count)),
420 ),
421 );
422 }
423
424 fn schedule_render_flush(&mut self, delay: Duration) {
425 self.ctx.schedule_internal_once(
426 INTERNAL_RENDER_FLUSH_ID,
427 delay.as_millis() as u64,
428 INTERNAL_RENDER_FLUSH_EVENT,
429 json!({"internal": true}),
430 );
431 }
432
433 fn child_mut(&mut self, id: &NestedLiveViewId) -> LiveResult<&mut NestedLiveViewInstance> {
434 self.children.get_mut(id).ok_or_else(|| {
435 ShellyError::InvalidMessage(format!("nested LiveView `{id}` is not mounted"))
436 })
437 }
438
439 fn active_child_mut(
440 &mut self,
441 id: &NestedLiveViewId,
442 ) -> LiveResult<&mut NestedLiveViewInstance> {
443 let child = self.child_mut(id)?;
444 match child.state {
445 NestedLiveViewState::Mounted => Ok(child),
446 NestedLiveViewState::Suspended => Err(ShellyError::InvalidMessage(format!(
447 "nested LiveView `{id}` is suspended"
448 ))),
449 NestedLiveViewState::Terminated => Err(ShellyError::InvalidMessage(format!(
450 "nested LiveView `{id}` is terminated"
451 ))),
452 }
453 }
454
455 fn child_id_for_target(&self, target: &str) -> Option<NestedLiveViewId> {
456 let id = NestedLiveViewId::new(target);
457 if self.children.contains_key(&id) {
458 return Some(id);
459 }
460 self.children
461 .iter()
462 .find_map(|(id, child)| (child.ctx.target_dom_id() == target).then(|| id.clone()))
463 }
464
465 fn emit_child_lifecycle_telemetry(
466 &self,
467 id: &NestedLiveViewId,
468 lifecycle: &'static str,
469 ok: bool,
470 ) {
471 self.emit_telemetry(
472 self.telemetry_event(TelemetryEventKind::Mount)
473 .with_ok(ok)
474 .with_attribute("nested_live_view".to_string(), Value::Bool(true))
475 .with_attribute(
476 "nested_lifecycle".to_string(),
477 Value::String(lifecycle.to_string()),
478 )
479 .with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
480 );
481 }
482
483 fn render_child_update_by_id(&mut self, id: &NestedLiveViewId) -> LiveResult<ServerMessage> {
484 let (target, next, previous) = {
485 let child = self.active_child_mut(id)?;
486 let next = child.view.render();
487 let previous = child.last_render.clone();
488 child.last_render = Some(next.clone());
489 let _ = child.ctx.take_render_after_event();
490 (child.ctx.target_dom_id().to_string(), next, previous)
491 };
492
493 self.revision += 1;
494 let diff = diff_template(previous.as_ref(), &next);
495 let message = match diff {
496 Some(slots) => ServerMessage::Diff {
497 target,
498 revision: self.revision,
499 slots,
500 },
501 None => ServerMessage::Patch {
502 target,
503 html: next.as_str().to_string(),
504 revision: self.revision,
505 },
506 };
507
508 match &message {
509 ServerMessage::Diff { slots, .. } => {
510 let bytes = slots.iter().map(|slot| slot.html.len()).sum();
511 self.emit_telemetry(
512 self.telemetry_event(TelemetryEventKind::Diff)
513 .with_bytes(bytes)
514 .with_count(slots.len())
515 .with_attribute("nested_live_view".to_string(), Value::Bool(true))
516 .with_attribute(
517 "nested_child_id".to_string(),
518 Value::String(id.to_string()),
519 ),
520 );
521 }
522 ServerMessage::Patch { html, .. } => {
523 self.emit_telemetry(
524 self.telemetry_event(TelemetryEventKind::Patch)
525 .with_bytes(html.len())
526 .with_count(1)
527 .with_attribute("nested_live_view".to_string(), Value::Bool(true))
528 .with_attribute(
529 "nested_child_id".to_string(),
530 Value::String(id.to_string()),
531 ),
532 );
533 }
534 _ => {}
535 }
536
537 Ok(message)
538 }
539
540 fn handle_child_event(&mut self, id: NestedLiveViewId, mut event: Event) -> Vec<ServerMessage> {
541 event.target = Some(id.to_string());
542 let result = {
543 let child = match self.active_child_mut(&id) {
544 Ok(child) => child,
545 Err(err) => {
546 return vec![ServerMessage::Error {
547 message: err.to_string(),
548 code: Some("nested_live_view_unavailable".to_string()),
549 }];
550 }
551 };
552 child.view.handle_event(event, &mut child.ctx)
553 };
554
555 if let Err(err) = result {
556 if let Ok(child) = self.child_mut(&id) {
557 let _ = child.ctx.drain_pushes();
558 let _ = child.ctx.drain_pubsub_commands();
559 let _ = child.ctx.drain_runtime_commands();
560 let _ = child.ctx.take_render_after_event();
561 }
562 return vec![ServerMessage::Error {
563 message: err.to_string(),
564 code: Some("nested_event_failed".to_string()),
565 }];
566 }
567
568 let (mut messages, render) = match self.child_mut(&id) {
569 Ok(child) => (
570 child.ctx.drain_pushes(),
571 child.ctx.take_render_after_event(),
572 ),
573 Err(err) => {
574 return vec![ServerMessage::Error {
575 message: err.to_string(),
576 code: Some("nested_live_view_unavailable".to_string()),
577 }];
578 }
579 };
580 if render {
581 match self.render_child_update_by_id(&id) {
582 Ok(message) => messages.push(message),
583 Err(err) => messages.push(ServerMessage::Error {
584 message: err.to_string(),
585 code: Some("nested_event_failed".to_string()),
586 }),
587 }
588 }
589 messages
590 }
591
592 fn coalesce_or_render_root_update(&mut self, messages: &mut Vec<ServerMessage>) {
593 let cadence_ms = self.effective_render_cadence_ms();
594 if cadence_ms == 0 {
595 if self.coalesced_render_count > 0 {
596 let since = self.coalesced_render_since;
597 self.emit_render_cadence_telemetry(
598 "flush_immediate_override",
599 cadence_ms,
600 self.coalesced_render_count,
601 since.map(|started| started.elapsed().as_millis() as u64),
602 );
603 }
604 messages.push(self.render_update());
605 return;
606 }
607
608 let now = Instant::now();
609 let elapsed_since_render = self
610 .last_render_sent_at
611 .map(|at| now.saturating_duration_since(at));
612
613 let can_render_now = match elapsed_since_render {
614 None => true,
615 Some(elapsed) => elapsed.as_millis() as u64 >= cadence_ms,
616 };
617
618 if can_render_now {
619 if self.coalesced_render_count > 0 {
620 let since = self.coalesced_render_since;
621 self.emit_render_cadence_telemetry(
622 "flush",
623 cadence_ms,
624 self.coalesced_render_count,
625 since.map(|started| started.elapsed().as_millis() as u64),
626 );
627 } else {
628 self.emit_render_cadence_telemetry("immediate", cadence_ms, 0, Some(0));
629 }
630 messages.push(self.render_update());
631 return;
632 }
633
634 let elapsed_ms = elapsed_since_render
635 .map(|elapsed| elapsed.as_millis() as u64)
636 .unwrap_or(0);
637 let remaining_ms = cadence_ms.saturating_sub(elapsed_ms).max(1);
638 self.coalesced_render_count += 1;
639 if self.coalesced_render_since.is_none() {
640 self.coalesced_render_since = Some(now);
641 }
642 self.emit_render_cadence_telemetry(
643 "coalesced",
644 cadence_ms,
645 self.coalesced_render_count,
646 Some(remaining_ms),
647 );
648 let deadline = now + Duration::from_millis(remaining_ms);
649 let should_schedule = match self.render_flush_deadline {
650 Some(existing_deadline) => deadline < existing_deadline,
651 None => true,
652 };
653 if should_schedule {
654 self.render_flush_deadline = Some(deadline);
655 self.schedule_render_flush(Duration::from_millis(remaining_ms));
656 }
657 }
658
659 fn handle_internal_render_flush_event(&mut self) -> Vec<ServerMessage> {
660 if self.coalesced_render_count == 0 {
661 return Vec::new();
662 }
663 self.render_flush_deadline = None;
664
665 let mut messages = Vec::new();
666 self.coalesce_or_render_root_update(&mut messages);
667 messages
668 }
669
670 pub fn render_html(&self) -> Html {
672 self.view.render()
673 }
674
675 pub fn patch_route(
677 &mut self,
678 route_path: impl Into<String>,
679 route_params: BTreeMap<String, String>,
680 ) -> LiveResult {
681 self.ctx.set_route(route_path, route_params);
682 self.view.handle_params(&mut self.ctx)
683 }
684
685 pub fn hello(&self) -> ServerMessage {
687 ServerMessage::Hello {
688 session_id: self.session_id().to_string(),
689 target: self.ctx.target_dom_id().to_string(),
690 revision: self.revision,
691 protocol: PROTOCOL_VERSION.to_string(),
692 server_revision: None,
693 resume_status: None,
694 resume_reason: None,
695 resume_token: None,
696 resume_expires_in_ms: None,
697 }
698 }
699
700 pub fn render_patch(&mut self) -> ServerMessage {
702 self.revision += 1;
703 let html = self.view.render();
704 let html_string = html.as_str().to_string();
705 let bytes = html_string.len();
706 self.last_render = Some(html);
707 let message = ServerMessage::Patch {
708 target: self.ctx.target_dom_id().to_string(),
709 html: html_string,
710 revision: self.revision,
711 };
712 self.emit_telemetry(
713 self.telemetry_event(TelemetryEventKind::Patch)
714 .with_bytes(bytes)
715 .with_count(1),
716 );
717 self.mark_render_sent_now();
718 message
719 }
720
721 pub fn render_update(&mut self) -> ServerMessage {
723 let next = self.view.render();
724 let diff = self.diff_against_last_render(&next);
725 self.revision += 1;
726
727 let message = match diff {
728 Some(slots) => ServerMessage::Diff {
729 target: self.ctx.target_dom_id().to_string(),
730 revision: self.revision,
731 slots,
732 },
733 None => ServerMessage::Patch {
734 target: self.ctx.target_dom_id().to_string(),
735 html: next.as_str().to_string(),
736 revision: self.revision,
737 },
738 };
739
740 match &message {
741 ServerMessage::Diff { slots, .. } => {
742 let bytes = slots.iter().map(|slot| slot.html.len()).sum();
743 self.emit_telemetry(
744 self.telemetry_event(TelemetryEventKind::Diff)
745 .with_bytes(bytes)
746 .with_count(slots.len()),
747 );
748 }
749 ServerMessage::Patch { html, .. } => {
750 self.emit_telemetry(
751 self.telemetry_event(TelemetryEventKind::Patch)
752 .with_bytes(html.len())
753 .with_count(1),
754 );
755 }
756 _ => {}
757 }
758
759 self.last_render = Some(next);
760 self.mark_render_sent_now();
761 message
762 }
763
764 pub fn render_snapshot_patch(&mut self) -> ServerMessage {
769 let html = self.view.render();
770 let html_string = html.as_str().to_string();
771 self.last_render = Some(html);
772 self.emit_telemetry(
773 self.telemetry_event(TelemetryEventKind::Patch)
774 .with_bytes(html_string.len())
775 .with_count(1)
776 .with_attribute("snapshot".to_string(), Value::Bool(true)),
777 );
778 let message = ServerMessage::Patch {
779 target: self.ctx.target_dom_id().to_string(),
780 html: html_string,
781 revision: self.revision,
782 };
783 self.mark_render_sent_now();
784 message
785 }
786
787 pub fn render_component_update(&mut self, render: ComponentRender) -> ServerMessage {
789 self.revision += 1;
790 let (id, html) = render.into_parts();
791 let message = ServerMessage::Patch {
792 target: id.to_string(),
793 html: html.as_str().to_string(),
794 revision: self.revision,
795 };
796 if let ServerMessage::Patch { html, .. } = &message {
797 self.emit_telemetry(
798 self.telemetry_event(TelemetryEventKind::Patch)
799 .with_bytes(html.len())
800 .with_count(1)
801 .with_attribute("component_patch".to_string(), Value::Bool(true)),
802 );
803 }
804 message
805 }
806
807 pub fn handle_client_message(&mut self, message: ClientMessage) -> Vec<ServerMessage> {
809 let started = Instant::now();
810 let revision_before = self.revision;
811 let captured_message = message.clone();
812 if let Some(tenant_id) = tenant_id_for_client_message(&message) {
813 self.set_tenant_id(tenant_id);
814 }
815 let event_name = match &message {
816 ClientMessage::Event { event, .. } if event != INTERNAL_RENDER_FLUSH_EVENT => {
817 Some(event.clone())
818 }
819 _ => None,
820 };
821
822 let messages = match message {
823 ClientMessage::Connect { protocol, .. } => {
824 if protocol == PROTOCOL_VERSION {
825 Vec::new()
826 } else {
827 vec![ServerMessage::Error {
828 message: format!(
829 "unsupported protocol in connect: expected {PROTOCOL_VERSION}, got {protocol}"
830 ),
831 code: Some("unsupported_protocol".to_string()),
832 }]
833 }
834 }
835 ClientMessage::Ping { nonce } => vec![ServerMessage::Pong { nonce }],
836 ClientMessage::PatchUrl { to } => vec![ServerMessage::PatchUrl { to }],
837 ClientMessage::Navigate { to } => vec![ServerMessage::Navigate { to }],
838 ClientMessage::UploadStart { .. }
839 | ClientMessage::UploadChunk { .. }
840 | ClientMessage::UploadComplete { .. } => vec![ServerMessage::Error {
841 message: "upload protocol messages must be handled by a transport adapter"
842 .to_string(),
843 code: Some("unsupported_upload_transport".to_string()),
844 }],
845 event_message @ ClientMessage::Event { .. } => {
846 let is_internal_flush = matches!(
847 &event_message,
848 ClientMessage::Event { event, target, .. }
849 if event == INTERNAL_RENDER_FLUSH_EVENT && target.is_none()
850 );
851 if is_internal_flush {
852 self.handle_internal_render_flush_event()
853 } else {
854 match Event::try_from(event_message) {
855 Err(err) => vec![ServerMessage::Error {
856 message: err.to_string(),
857 code: Some("invalid_event".to_string()),
858 }],
859 Ok(event) => {
860 if let Some(target) = event.target.clone() {
861 if let Some(child_id) = self.child_id_for_target(&target) {
862 self.handle_child_event(child_id, event)
863 } else {
864 match self.view.handle_component_event(
865 &target,
866 event.clone(),
867 &mut self.ctx,
868 ) {
869 Ok(Some(render)) => {
870 let mut messages = self.ctx.drain_pushes();
871 if self.ctx.take_render_after_event() {
872 messages.push(self.render_component_update(render));
873 }
874 messages
875 }
876 Ok(None) => {
877 if let Err(err) =
878 self.view.handle_event(event, &mut self.ctx)
879 {
880 let _ = self.ctx.drain_pushes();
881 let _ = self.ctx.drain_pubsub_commands();
882 let _ = self.ctx.drain_runtime_commands();
883 let _ = self.ctx.take_render_after_event();
884 vec![ServerMessage::Error {
885 message: err.to_string(),
886 code: Some("event_failed".to_string()),
887 }]
888 } else {
889 let mut messages = self.ctx.drain_pushes();
890 if self.ctx.take_render_after_event() {
891 self.coalesce_or_render_root_update(
892 &mut messages,
893 );
894 }
895 messages
896 }
897 }
898 Err(err) => {
899 let _ = self.ctx.drain_pushes();
900 let _ = self.ctx.drain_pubsub_commands();
901 let _ = self.ctx.drain_runtime_commands();
902 let _ = self.ctx.take_render_after_event();
903 vec![ServerMessage::Error {
904 message: err.to_string(),
905 code: Some("event_failed".to_string()),
906 }]
907 }
908 }
909 }
910 } else if let Err(err) = self.view.handle_event(event, &mut self.ctx) {
911 let _ = self.ctx.drain_pushes();
912 let _ = self.ctx.drain_pubsub_commands();
913 let _ = self.ctx.drain_runtime_commands();
914 let _ = self.ctx.take_render_after_event();
915 vec![ServerMessage::Error {
916 message: err.to_string(),
917 code: Some("event_failed".to_string()),
918 }]
919 } else {
920 let mut messages = self.ctx.drain_pushes();
921 if self.ctx.take_render_after_event() {
922 self.coalesce_or_render_root_update(&mut messages);
923 }
924 messages
925 }
926 }
927 }
928 }
929 }
930 };
931
932 for message in &messages {
933 match message {
934 ServerMessage::StreamInsert { .. } => {
935 self.emit_telemetry(
936 self.telemetry_event(TelemetryEventKind::StreamInsert)
937 .with_count(1),
938 );
939 }
940 ServerMessage::StreamDelete { .. } => {
941 self.emit_telemetry(
942 self.telemetry_event(TelemetryEventKind::StreamDelete)
943 .with_count(1),
944 );
945 }
946 ServerMessage::StreamBatch { operations, .. } => {
947 let inserts = operations
948 .iter()
949 .filter(|operation| {
950 matches!(operation, crate::StreamBatchOperation::Insert { .. })
951 })
952 .count();
953 let deletes = operations
954 .iter()
955 .filter(|operation| {
956 matches!(operation, crate::StreamBatchOperation::Delete { .. })
957 })
958 .count();
959 if inserts > 0 {
960 self.emit_telemetry(
961 self.telemetry_event(TelemetryEventKind::StreamInsert)
962 .with_count(inserts),
963 );
964 }
965 if deletes > 0 {
966 self.emit_telemetry(
967 self.telemetry_event(TelemetryEventKind::StreamDelete)
968 .with_count(deletes),
969 );
970 }
971 }
972 ServerMessage::Error { .. } => {
973 self.emit_telemetry(
974 self.telemetry_event(TelemetryEventKind::Error)
975 .with_ok(false)
976 .with_count(1),
977 );
978 }
979 _ => {}
980 }
981 }
982
983 let ok = messages
984 .iter()
985 .all(|message| !matches!(message, ServerMessage::Error { .. }));
986 let mut event_telemetry = self
987 .telemetry_event(TelemetryEventKind::HandleEvent)
988 .with_ok(ok)
989 .with_latency_ms(started.elapsed().as_millis() as u64)
990 .with_count(messages.len());
991 if let Some(name) = event_name {
992 event_telemetry = event_telemetry.with_event_name(name);
993 }
994 self.emit_telemetry(event_telemetry);
995
996 if let Some(recorder) = self.trace_recorder.as_mut() {
997 recorder.record_turn(&captured_message, &messages, revision_before, self.revision);
998 }
999
1000 messages
1001 }
1002
1003 fn diff_against_last_render(&self, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
1004 diff_template(self.last_render.as_ref(), next)
1005 }
1006}
1007
1008fn diff_template(previous: Option<&Html>, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
1009 let previous = previous?.template()?;
1010 let next_template = next.template()?;
1011 if !previous.compatible_with(next_template) {
1012 return None;
1013 }
1014
1015 Some(
1016 previous
1017 .dynamic_segments()
1018 .iter()
1019 .zip(next_template.dynamic_segments())
1020 .enumerate()
1021 .filter_map(|(index, (before, after))| {
1022 if before == after {
1023 None
1024 } else {
1025 Some(DynamicSlotPatch {
1026 index,
1027 html: after.clone(),
1028 })
1029 }
1030 })
1031 .collect(),
1032 )
1033}
1034
1035fn tenant_id_for_client_message(message: &ClientMessage) -> Option<String> {
1036 match message {
1037 ClientMessage::Connect { tenant_id, .. } => normalize_tenant_id(tenant_id.as_deref()),
1038 ClientMessage::Event {
1039 metadata, value, ..
1040 } => metadata
1041 .get("tenant_id")
1042 .and_then(Value::as_str)
1043 .or_else(|| value.get("tenant_id").and_then(Value::as_str))
1044 .and_then(|value| normalize_tenant_id(Some(value))),
1045 _ => None,
1046 }
1047}
1048
1049fn normalize_tenant_id(value: Option<&str>) -> Option<String> {
1050 value
1051 .map(str::trim)
1052 .filter(|value| !value.is_empty())
1053 .map(ToString::to_string)
1054}
1055
1056impl Drop for LiveSession {
1057 fn drop(&mut self) {
1058 self.ctx.set_connected(false);
1059 }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064 use super::{LiveSession, INTERNAL_RENDER_FLUSH_EVENT};
1065 use crate::{
1066 ClientMessage, ComponentId, ComponentRender, Context, DynamicSlotPatch, Event, Html,
1067 LiveComponent, LiveResult, LiveView, MemoryTelemetrySink, NestedLiveViewState,
1068 RuntimeCommand, ServerMessage, ShellyError, StreamBatchOperation, TelemetryEvent,
1069 TelemetryEventKind, TelemetrySink, Template, TraceRedactionPolicy,
1070 };
1071 use serde_json::{json, Value};
1072 use std::{collections::BTreeMap, sync::Arc, thread, time::Duration};
1073
1074 #[derive(Default)]
1075 struct Counter {
1076 count: i64,
1077 }
1078
1079 impl LiveView for Counter {
1080 fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1081 match event.name.as_str() {
1082 "inc" => self.count += 1,
1083 "dec" => self.count -= 1,
1084 _ => {}
1085 }
1086
1087 Ok(())
1088 }
1089
1090 fn render(&self) -> Html {
1091 Html::new(format!("<p>{}</p>", self.count))
1092 }
1093 }
1094
1095 #[test]
1096 fn session_dispatches_event_and_patches() {
1097 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1098 session.mount().unwrap();
1099
1100 let messages = session.handle_client_message(ClientMessage::Event {
1101 event: "inc".to_string(),
1102 target: None,
1103 value: Value::Null,
1104 metadata: Default::default(),
1105 });
1106
1107 assert_eq!(messages.len(), 1);
1108 assert_eq!(
1109 messages[0],
1110 ServerMessage::Patch {
1111 target: "root".to_string(),
1112 html: "<p>1</p>".to_string(),
1113 revision: 1,
1114 }
1115 );
1116 }
1117
1118 #[test]
1119 fn session_responds_to_ping_without_rendering() {
1120 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1121 let messages = session.handle_client_message(ClientMessage::Ping {
1122 nonce: Some("abc".to_string()),
1123 });
1124
1125 assert_eq!(
1126 messages,
1127 vec![ServerMessage::Pong {
1128 nonce: Some("abc".to_string())
1129 }]
1130 );
1131 assert_eq!(session.revision(), 0);
1132 }
1133
1134 #[test]
1135 fn connect_with_supported_protocol_is_noop() {
1136 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1137 let messages = session.handle_client_message(ClientMessage::Connect {
1138 protocol: "shelly/1".to_string(),
1139 session_id: None,
1140 last_revision: None,
1141 resume_token: None,
1142 tenant_id: None,
1143 trace_id: None,
1144 span_id: None,
1145 parent_span_id: None,
1146 correlation_id: None,
1147 request_id: None,
1148 });
1149 assert!(messages.is_empty());
1150 }
1151
1152 #[test]
1153 fn connect_with_unsupported_protocol_returns_structured_error() {
1154 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1155 let messages = session.handle_client_message(ClientMessage::Connect {
1156 protocol: "shelly/2".to_string(),
1157 session_id: None,
1158 last_revision: None,
1159 resume_token: None,
1160 tenant_id: None,
1161 trace_id: None,
1162 span_id: None,
1163 parent_span_id: None,
1164 correlation_id: None,
1165 request_id: None,
1166 });
1167 assert_eq!(
1168 messages,
1169 vec![ServerMessage::Error {
1170 message: "unsupported protocol in connect: expected shelly/1, got shelly/2"
1171 .to_string(),
1172 code: Some("unsupported_protocol".to_string()),
1173 }]
1174 );
1175 }
1176
1177 #[test]
1178 fn connect_and_event_propagate_tenant_context() {
1179 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1180 let _ = session.handle_client_message(ClientMessage::Connect {
1181 protocol: "shelly/1".to_string(),
1182 session_id: None,
1183 last_revision: None,
1184 resume_token: None,
1185 tenant_id: Some("tenant-a".to_string()),
1186 trace_id: None,
1187 span_id: None,
1188 parent_span_id: None,
1189 correlation_id: None,
1190 request_id: None,
1191 });
1192 assert_eq!(session.tenant_id(), Some("tenant-a"));
1193
1194 let _ = session.handle_client_message(ClientMessage::Event {
1195 event: "inc".to_string(),
1196 target: None,
1197 value: json!({"tenant_id": "tenant-b"}),
1198 metadata: Default::default(),
1199 });
1200 assert_eq!(session.tenant_id(), Some("tenant-b"));
1201 }
1202
1203 #[test]
1204 fn unknown_events_do_not_panic() {
1205 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1206 session.mount().unwrap();
1207
1208 let messages = session.handle_client_message(ClientMessage::Event {
1209 event: "unknown".to_string(),
1210 target: None,
1211 value: Value::Null,
1212 metadata: Default::default(),
1213 });
1214
1215 assert_eq!(
1216 messages,
1217 vec![ServerMessage::Patch {
1218 target: "root".to_string(),
1219 html: "<p>0</p>".to_string(),
1220 revision: 1,
1221 }]
1222 );
1223 }
1224
1225 #[derive(Default)]
1226 struct FailingEvent {
1227 count: i64,
1228 }
1229
1230 impl LiveView for FailingEvent {
1231 fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1232 match event.name.as_str() {
1233 "fail" => {
1234 ctx.redirect("/should-not-leak");
1235 Err(ShellyError::Event("boom".to_string()))
1236 }
1237 "inc" => {
1238 self.count += 1;
1239 Ok(())
1240 }
1241 _ => Ok(()),
1242 }
1243 }
1244
1245 fn render(&self) -> Html {
1246 Html::new(format!("<p>{}</p>", self.count))
1247 }
1248 }
1249
1250 #[test]
1251 fn event_errors_do_not_increment_revision_or_leak_pushes() {
1252 let mut session = LiveSession::new(Box::<FailingEvent>::default(), "root");
1253 session.mount().unwrap();
1254
1255 let failure = session.handle_client_message(ClientMessage::Event {
1256 event: "fail".to_string(),
1257 target: None,
1258 value: Value::Null,
1259 metadata: Default::default(),
1260 });
1261
1262 assert_eq!(
1263 failure,
1264 vec![ServerMessage::Error {
1265 message: "event error: boom".to_string(),
1266 code: Some("event_failed".to_string()),
1267 }]
1268 );
1269 assert_eq!(session.revision(), 0);
1270
1271 let success = session.handle_client_message(ClientMessage::Event {
1272 event: "inc".to_string(),
1273 target: None,
1274 value: Value::Null,
1275 metadata: Default::default(),
1276 });
1277
1278 assert_eq!(
1279 success,
1280 vec![ServerMessage::Patch {
1281 target: "root".to_string(),
1282 html: "<p>1</p>".to_string(),
1283 revision: 1,
1284 }]
1285 );
1286 }
1287
1288 #[derive(Default)]
1289 struct TemplateCounter {
1290 count: i64,
1291 }
1292
1293 impl LiveView for TemplateCounter {
1294 fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1295 if event.name == "inc" {
1296 self.count += 1;
1297 }
1298 Ok(())
1299 }
1300
1301 fn render(&self) -> Html {
1302 Template::new(vec!["<p>Count: ", "</p>"], vec![self.count.to_string()]).into()
1303 }
1304 }
1305
1306 #[test]
1307 fn template_updates_send_only_changed_dynamic_slots() {
1308 let mut session = LiveSession::new(Box::<TemplateCounter>::default(), "root");
1309 session.mount().unwrap();
1310
1311 assert_eq!(
1312 session.render_patch(),
1313 ServerMessage::Patch {
1314 target: "root".to_string(),
1315 html: r#"<p>Count: <span data-shelly-slot="0">0</span></p>"#.to_string(),
1316 revision: 1,
1317 }
1318 );
1319
1320 let update = session.handle_client_message(ClientMessage::Event {
1321 event: "inc".to_string(),
1322 target: None,
1323 value: Value::Null,
1324 metadata: Default::default(),
1325 });
1326
1327 assert_eq!(
1328 update,
1329 vec![ServerMessage::Diff {
1330 target: "root".to_string(),
1331 revision: 2,
1332 slots: vec![DynamicSlotPatch {
1333 index: 0,
1334 html: "1".to_string(),
1335 }],
1336 }]
1337 );
1338 }
1339
1340 #[test]
1341 fn plain_html_keeps_full_patch_fallback() {
1342 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1343 session.mount().unwrap();
1344 assert!(matches!(
1345 session.render_patch(),
1346 ServerMessage::Patch { .. }
1347 ));
1348
1349 let update = session.handle_client_message(ClientMessage::Event {
1350 event: "inc".to_string(),
1351 target: None,
1352 value: Value::Null,
1353 metadata: Default::default(),
1354 });
1355
1356 assert!(matches!(update.as_slice(), [ServerMessage::Patch { .. }]));
1357 }
1358
1359 #[derive(Default)]
1360 struct StreamList {
1361 next_id: usize,
1362 }
1363
1364 impl LiveView for StreamList {
1365 fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1366 match event.name.as_str() {
1367 "append" => {
1368 self.next_id += 1;
1369 let id = format!("item-{}", self.next_id);
1370 ctx.stream_append(
1371 "items",
1372 id.clone(),
1373 format!(r#"<li id="{id}">Item {}</li>"#, self.next_id),
1374 );
1375 }
1376 "delete" => ctx.stream_delete("items", "item-1"),
1377 _ => {}
1378 }
1379 Ok(())
1380 }
1381
1382 fn render(&self) -> Html {
1383 Html::new(r#"<ul id="items"></ul>"#)
1384 }
1385 }
1386
1387 #[test]
1388 fn stream_events_send_only_stream_operations() {
1389 let mut session = LiveSession::new(Box::<StreamList>::default(), "root");
1390 session.mount().unwrap();
1391 session.render_patch();
1392
1393 let append = session.handle_client_message(ClientMessage::Event {
1394 event: "append".to_string(),
1395 target: None,
1396 value: Value::Null,
1397 metadata: Default::default(),
1398 });
1399
1400 assert_eq!(
1401 append,
1402 vec![ServerMessage::StreamInsert {
1403 target: "items".to_string(),
1404 id: "item-1".to_string(),
1405 html: r#"<li id="item-1">Item 1</li>"#.to_string(),
1406 at: crate::StreamPosition::Append,
1407 }]
1408 );
1409
1410 let delete = session.handle_client_message(ClientMessage::Event {
1411 event: "delete".to_string(),
1412 target: None,
1413 value: Value::Null,
1414 metadata: Default::default(),
1415 });
1416
1417 assert_eq!(
1418 delete,
1419 vec![ServerMessage::StreamDelete {
1420 target: "items".to_string(),
1421 id: "item-1".to_string(),
1422 }]
1423 );
1424 }
1425
1426 #[derive(Default)]
1427 struct StreamBatchList {
1428 next_id: usize,
1429 }
1430
1431 impl LiveView for StreamBatchList {
1432 fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1433 if event.name == "burst_append" {
1434 let mut items = Vec::new();
1435 for _ in 0..3 {
1436 self.next_id += 1;
1437 let id = format!("item-{}", self.next_id);
1438 let html = format!(r#"<li id="{id}">Item {}</li>"#, self.next_id);
1439 items.push((id, html));
1440 }
1441 ctx.stream_append_many("items", items);
1442 }
1443 Ok(())
1444 }
1445
1446 fn render(&self) -> Html {
1447 Html::new(r#"<ul id="items"></ul>"#)
1448 }
1449 }
1450
1451 #[test]
1452 fn stream_burst_events_send_batch_operations() {
1453 let mut session = LiveSession::new(Box::<StreamBatchList>::default(), "root");
1454 session.mount().unwrap();
1455 session.render_patch();
1456
1457 let messages = session.handle_client_message(ClientMessage::Event {
1458 event: "burst_append".to_string(),
1459 target: None,
1460 value: Value::Null,
1461 metadata: Default::default(),
1462 });
1463
1464 assert_eq!(messages.len(), 1);
1465 assert_eq!(
1466 messages[0],
1467 ServerMessage::StreamBatch {
1468 target: "items".to_string(),
1469 operations: vec![
1470 StreamBatchOperation::Insert {
1471 id: "item-1".to_string(),
1472 html: r#"<li id="item-1">Item 1</li>"#.to_string(),
1473 at: crate::StreamPosition::Append,
1474 },
1475 StreamBatchOperation::Insert {
1476 id: "item-2".to_string(),
1477 html: r#"<li id="item-2">Item 2</li>"#.to_string(),
1478 at: crate::StreamPosition::Append,
1479 },
1480 StreamBatchOperation::Insert {
1481 id: "item-3".to_string(),
1482 html: r#"<li id="item-3">Item 3</li>"#.to_string(),
1483 at: crate::StreamPosition::Append,
1484 },
1485 ],
1486 }
1487 );
1488 }
1489
1490 #[derive(Default)]
1491 struct RuntimeScheduler;
1492
1493 impl LiveView for RuntimeScheduler {
1494 fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1495 ctx.schedule_interval("heartbeat", 1000, "tick", Value::Null);
1496 Ok(())
1497 }
1498
1499 fn render(&self) -> Html {
1500 Html::new("<p>scheduler</p>")
1501 }
1502 }
1503
1504 #[test]
1505 fn mount_can_queue_runtime_commands() {
1506 let mut session = LiveSession::new(Box::<RuntimeScheduler>::default(), "root");
1507 session.mount().unwrap();
1508
1509 assert_eq!(
1510 session.drain_runtime_commands(),
1511 vec![RuntimeCommand::ScheduleInterval {
1512 id: "heartbeat".to_string(),
1513 every_ms: 1000,
1514 dispatch: crate::RuntimeEvent::new("tick", Value::Null),
1515 }]
1516 );
1517 }
1518
1519 #[derive(Default)]
1520 struct CadencedCounter {
1521 count: i64,
1522 }
1523
1524 impl LiveView for CadencedCounter {
1525 fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1526 ctx.set_render_cadence_ms(25);
1527 Ok(())
1528 }
1529
1530 fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1531 if event.name == "inc" {
1532 self.count += 1;
1533 }
1534 Ok(())
1535 }
1536
1537 fn render(&self) -> Html {
1538 Html::new(format!("<p>{}</p>", self.count))
1539 }
1540 }
1541
1542 #[test]
1543 fn render_cadence_coalesces_bursty_root_patches() {
1544 let mut session = LiveSession::new(Box::<CadencedCounter>::default(), "root");
1545 session.mount().unwrap();
1546
1547 let first = session.handle_client_message(ClientMessage::Event {
1548 event: "inc".to_string(),
1549 target: None,
1550 value: Value::Null,
1551 metadata: Default::default(),
1552 });
1553 assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
1554
1555 let second = session.handle_client_message(ClientMessage::Event {
1556 event: "inc".to_string(),
1557 target: None,
1558 value: Value::Null,
1559 metadata: Default::default(),
1560 });
1561 assert!(second.is_empty());
1562
1563 let runtime = session.drain_runtime_commands();
1564 assert!(runtime.iter().any(|command| matches!(
1565 command,
1566 RuntimeCommand::ScheduleOnce { dispatch, .. }
1567 if dispatch.event == INTERNAL_RENDER_FLUSH_EVENT
1568 )));
1569
1570 thread::sleep(Duration::from_millis(30));
1571 let flush = session.handle_client_message(ClientMessage::Event {
1572 event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
1573 target: None,
1574 value: Value::Null,
1575 metadata: Default::default(),
1576 });
1577 assert!(matches!(
1578 flush.as_slice(),
1579 [ServerMessage::Patch { html, revision, .. }]
1580 if html == "<p>2</p>" && *revision == 2
1581 ));
1582 }
1583
1584 #[derive(Default)]
1585 struct CadenceOverrideCounter {
1586 count: i64,
1587 }
1588
1589 impl LiveView for CadenceOverrideCounter {
1590 fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1591 ctx.set_render_cadence_ms(100);
1592 Ok(())
1593 }
1594
1595 fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
1596 match event.name.as_str() {
1597 "inc" => self.count += 1,
1598 "disable_cadence" => {
1599 ctx.clear_render_cadence_override();
1600 self.count += 1;
1601 }
1602 _ => {}
1603 }
1604 Ok(())
1605 }
1606
1607 fn render(&self) -> Html {
1608 Html::new(format!("<p>{}</p>", self.count))
1609 }
1610 }
1611
1612 #[test]
1613 fn per_view_override_can_disable_render_cadence() {
1614 let mut session = LiveSession::new(Box::<CadenceOverrideCounter>::default(), "root");
1615 session.mount().unwrap();
1616
1617 let first = session.handle_client_message(ClientMessage::Event {
1618 event: "inc".to_string(),
1619 target: None,
1620 value: Value::Null,
1621 metadata: Default::default(),
1622 });
1623 assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
1624
1625 let coalesced = session.handle_client_message(ClientMessage::Event {
1626 event: "inc".to_string(),
1627 target: None,
1628 value: Value::Null,
1629 metadata: Default::default(),
1630 });
1631 assert!(coalesced.is_empty());
1632
1633 let disabled = session.handle_client_message(ClientMessage::Event {
1634 event: "disable_cadence".to_string(),
1635 target: None,
1636 value: Value::Null,
1637 metadata: Default::default(),
1638 });
1639 assert!(matches!(
1640 disabled.as_slice(),
1641 [ServerMessage::Patch { html, revision, .. }]
1642 if html == "<p>3</p>" && *revision == 2
1643 ));
1644 }
1645
1646 struct FailingTelemetrySink;
1647
1648 impl TelemetrySink for FailingTelemetrySink {
1649 fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
1650 Err("telemetry downstream unavailable".to_string())
1651 }
1652 }
1653
1654 #[test]
1655 fn telemetry_sink_failures_do_not_break_runtime_flow() {
1656 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1657 session.set_telemetry_sink(Arc::new(FailingTelemetrySink));
1658 session.mount().unwrap();
1659
1660 let messages = session.handle_client_message(ClientMessage::Event {
1661 event: "inc".to_string(),
1662 target: None,
1663 value: Value::Null,
1664 metadata: Default::default(),
1665 });
1666
1667 assert!(matches!(messages.as_slice(), [ServerMessage::Patch { .. }]));
1668 }
1669
1670 struct TodoItem {
1671 id: ComponentId,
1672 title: String,
1673 done: bool,
1674 }
1675
1676 impl TodoItem {
1677 fn new(id: &str, title: &str) -> Self {
1678 Self {
1679 id: ComponentId::new(id),
1680 title: title.to_string(),
1681 done: false,
1682 }
1683 }
1684 }
1685
1686 impl LiveComponent for TodoItem {
1687 fn id(&self) -> ComponentId {
1688 self.id.clone()
1689 }
1690
1691 fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1692 if event.name == "toggle" {
1693 self.done = !self.done;
1694 }
1695 Ok(())
1696 }
1697
1698 fn render(&self) -> Html {
1699 Html::new(format!(
1700 r#"<li id="{id}" data-done="{done}"><span>{title}</span><button shelly-click="toggle" shelly-target="{id}">Toggle</button></li>"#,
1701 id = self.id,
1702 done = self.done,
1703 title = self.title,
1704 ))
1705 }
1706 }
1707
1708 struct TodoList {
1709 items: Vec<TodoItem>,
1710 }
1711
1712 impl Default for TodoList {
1713 fn default() -> Self {
1714 Self {
1715 items: vec![
1716 TodoItem::new("todo-1", "Write docs"),
1717 TodoItem::new("todo-2", "Ship components"),
1718 ],
1719 }
1720 }
1721 }
1722
1723 impl LiveView for TodoList {
1724 fn handle_component_event(
1725 &mut self,
1726 target: &str,
1727 event: Event,
1728 ctx: &mut Context,
1729 ) -> LiveResult<Option<ComponentRender>> {
1730 let Some(item) = self
1731 .items
1732 .iter_mut()
1733 .find(|item| item.id().as_str() == target)
1734 else {
1735 return Ok(None);
1736 };
1737
1738 item.handle_event(event, ctx)?;
1739 Ok(Some(ComponentRender::new(item.id(), item.render())))
1740 }
1741
1742 fn render(&self) -> Html {
1743 let items = self
1744 .items
1745 .iter()
1746 .map(|item| item.render().into_string())
1747 .collect::<String>();
1748 Html::new(format!(r#"<ul>{items}</ul>"#))
1749 }
1750 }
1751
1752 #[test]
1753 fn scoped_component_event_patches_only_target_component() {
1754 let mut session = LiveSession::new(Box::<TodoList>::default(), "root");
1755 session.mount().unwrap();
1756
1757 let initial = session.render_patch();
1758 assert!(matches!(
1759 initial,
1760 ServerMessage::Patch { ref html, .. } if html.contains("todo-1")
1761 ));
1762
1763 let update = session.handle_client_message(ClientMessage::Event {
1764 event: "toggle".to_string(),
1765 target: Some("todo-2".to_string()),
1766 value: Value::Null,
1767 metadata: Default::default(),
1768 });
1769
1770 assert_eq!(update.len(), 1);
1771 assert_eq!(
1772 update[0],
1773 ServerMessage::Patch {
1774 target: "todo-2".to_string(),
1775 html: r#"<li id="todo-2" data-done="true"><span>Ship components</span><button shelly-click="toggle" shelly-target="todo-2">Toggle</button></li>"#.to_string(),
1776 revision: 2,
1777 }
1778 );
1779 }
1780
1781 #[derive(Default)]
1782 struct ChildCounter {
1783 count: i64,
1784 }
1785
1786 impl LiveView for ChildCounter {
1787 fn mount(&mut self, ctx: &mut Context) -> LiveResult {
1788 ctx.schedule_once_to("child-tick", 10, "child-a", "tick", Value::Null);
1789 Ok(())
1790 }
1791
1792 fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
1793 match event.name.as_str() {
1794 "inc" => self.count += 1,
1795 "fail" => return Err(ShellyError::Event("child boom".to_string())),
1796 _ => {}
1797 }
1798 Ok(())
1799 }
1800
1801 fn render(&self) -> Html {
1802 Html::new(format!(
1803 r#"<section id="child-a">child:{}</section>"#,
1804 self.count
1805 ))
1806 }
1807 }
1808
1809 #[test]
1810 fn nested_live_view_mounts_and_handles_scoped_events_without_parent_remount() {
1811 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1812 session.mount().unwrap();
1813
1814 let child_patch = session
1815 .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1816 .expect("child should mount");
1817 assert_eq!(
1818 child_patch,
1819 ServerMessage::Patch {
1820 target: "child-a".to_string(),
1821 html: r#"<section id="child-a">child:0</section>"#.to_string(),
1822 revision: 1,
1823 }
1824 );
1825 assert_eq!(
1826 session.child_state("child-a"),
1827 Some(NestedLiveViewState::Mounted)
1828 );
1829 assert_eq!(
1830 session.drain_runtime_commands(),
1831 vec![RuntimeCommand::ScheduleOnce {
1832 id: "child-tick".to_string(),
1833 delay_ms: 10,
1834 dispatch: crate::RuntimeEvent::with_target("child-a", "tick", Value::Null),
1835 }]
1836 );
1837
1838 let update = session.handle_client_message(ClientMessage::Event {
1839 event: "inc".to_string(),
1840 target: Some("child-a".to_string()),
1841 value: Value::Null,
1842 metadata: Default::default(),
1843 });
1844
1845 assert_eq!(
1846 update,
1847 vec![ServerMessage::Patch {
1848 target: "child-a".to_string(),
1849 html: r#"<section id="child-a">child:1</section>"#.to_string(),
1850 revision: 2,
1851 }]
1852 );
1853
1854 let parent_snapshot = session.render_snapshot_patch();
1855 assert!(matches!(
1856 parent_snapshot,
1857 ServerMessage::Patch { html, revision, .. }
1858 if html == "<p>0</p>" && revision == 2
1859 ));
1860 }
1861
1862 #[test]
1863 fn nested_live_view_lifecycle_suspend_resume_and_terminate_are_isolated() {
1864 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1865 session.mount().unwrap();
1866 let _ = session
1867 .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1868 .expect("child should mount");
1869
1870 session.suspend_child("child-a").expect("suspend child");
1871 assert_eq!(
1872 session.child_state("child-a"),
1873 Some(NestedLiveViewState::Suspended)
1874 );
1875 let blocked = session.handle_client_message(ClientMessage::Event {
1876 event: "inc".to_string(),
1877 target: Some("child-a".to_string()),
1878 value: Value::Null,
1879 metadata: Default::default(),
1880 });
1881 assert!(matches!(
1882 blocked.as_slice(),
1883 [ServerMessage::Error { code: Some(code), .. }]
1884 if code == "nested_live_view_unavailable"
1885 ));
1886 assert_eq!(session.revision(), 1);
1887
1888 let resumed = session.resume_child("child-a").expect("resume child");
1889 assert!(matches!(
1890 resumed,
1891 ServerMessage::Patch { target, revision, .. }
1892 if target == "child-a" && revision == 2
1893 ));
1894
1895 let terminated = session.terminate_child("child-a").expect("terminate child");
1896 assert_eq!(
1897 terminated,
1898 ServerMessage::Patch {
1899 target: "child-a".to_string(),
1900 html: String::new(),
1901 revision: 3,
1902 }
1903 );
1904 assert_eq!(
1905 session.child_state("child-a"),
1906 Some(NestedLiveViewState::Terminated)
1907 );
1908 }
1909
1910 #[test]
1911 fn nested_live_view_event_failure_does_not_corrupt_parent_or_siblings() {
1912 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1913 session.mount().unwrap();
1914 let _ = session
1915 .mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
1916 .expect("child a should mount");
1917 let _ = session
1918 .mount_child("child-b", Box::<ChildCounter>::default(), "child-b")
1919 .expect("child b should mount");
1920
1921 let failure = session.handle_client_message(ClientMessage::Event {
1922 event: "fail".to_string(),
1923 target: Some("child-a".to_string()),
1924 value: Value::Null,
1925 metadata: Default::default(),
1926 });
1927 assert!(matches!(
1928 failure.as_slice(),
1929 [ServerMessage::Error { code: Some(code), .. }]
1930 if code == "nested_event_failed"
1931 ));
1932 assert_eq!(session.revision(), 2);
1933
1934 let sibling = session.handle_client_message(ClientMessage::Event {
1935 event: "inc".to_string(),
1936 target: Some("child-b".to_string()),
1937 value: Value::Null,
1938 metadata: Default::default(),
1939 });
1940 assert!(matches!(
1941 sibling.as_slice(),
1942 [ServerMessage::Patch { target, revision, .. }]
1943 if target == "child-b" && *revision == 3
1944 ));
1945
1946 let parent = session.handle_client_message(ClientMessage::Event {
1947 event: "inc".to_string(),
1948 target: None,
1949 value: Value::Null,
1950 metadata: Default::default(),
1951 });
1952 assert!(matches!(
1953 parent.as_slice(),
1954 [ServerMessage::Patch { target, html, revision }]
1955 if target == "root" && html == "<p>1</p>" && *revision == 4
1956 ));
1957 }
1958
1959 #[test]
1960 fn session_emits_mount_and_event_telemetry() {
1961 let telemetry = Arc::new(MemoryTelemetrySink::new());
1962 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
1963 session.set_telemetry_sink(telemetry.clone());
1964 session.mount().unwrap();
1965 let _ = session.handle_client_message(ClientMessage::Event {
1966 event: "inc".to_string(),
1967 target: None,
1968 value: Value::Null,
1969 metadata: Default::default(),
1970 });
1971
1972 let events = telemetry.events();
1973 assert!(events
1974 .iter()
1975 .any(|event| event.kind == TelemetryEventKind::Mount));
1976 assert!(events
1977 .iter()
1978 .any(|event| event.kind == TelemetryEventKind::HandleEvent));
1979 assert!(events
1980 .iter()
1981 .any(|event| event.kind == TelemetryEventKind::Patch));
1982 }
1983
1984 #[test]
1985 fn session_public_helpers_cover_route_hello_snapshot_and_trace_paths() {
1986 let mut route_params = BTreeMap::new();
1987 route_params.insert("team".to_string(), "core".to_string());
1988 let mut session = LiveSession::new_with_route(
1989 Box::<Counter>::default(),
1990 "root",
1991 "/teams/core",
1992 route_params.clone(),
1993 );
1994
1995 assert_eq!(session.route_path(), "/teams/core");
1996 assert_eq!(session.route_params(), &route_params);
1997 assert_eq!(session.render_html().as_str(), "<p>0</p>");
1998
1999 session.set_default_render_cadence_ms(10);
2000 let hello = session.hello();
2001 assert!(matches!(
2002 hello,
2003 ServerMessage::Hello { ref target, revision, .. } if target == "root" && revision == 0
2004 ));
2005
2006 let mut patched_params = BTreeMap::new();
2007 patched_params.insert("team".to_string(), "platform".to_string());
2008 session
2009 .patch_route("/teams/platform", patched_params.clone())
2010 .unwrap();
2011 assert_eq!(session.route_path(), "/teams/platform");
2012 assert_eq!(session.route_params(), &patched_params);
2013
2014 session.enable_trace_capture(TraceRedactionPolicy::none());
2015 assert!(session.trace_capture_enabled());
2016 let _ = session.handle_client_message(ClientMessage::Event {
2017 event: "inc".to_string(),
2018 target: None,
2019 value: Value::Null,
2020 metadata: Default::default(),
2021 });
2022 assert!(session.trace_artifact().is_some());
2023
2024 let snapshot = session.render_snapshot_patch();
2025 assert!(matches!(
2026 snapshot,
2027 ServerMessage::Patch { ref html, revision, .. } if html == "<p>1</p>" && revision == session.revision()
2028 ));
2029
2030 let artifact = session.take_trace_artifact();
2031 assert!(artifact.is_some());
2032 assert!(!session.trace_capture_enabled());
2033 session.disable_trace_capture();
2034 assert!(session.trace_artifact().is_none());
2035 }
2036
2037 #[derive(Default)]
2038 struct MountFailureView;
2039
2040 impl LiveView for MountFailureView {
2041 fn mount(&mut self, ctx: &mut Context) -> LiveResult {
2042 ctx.redirect("/mount-failure");
2043 ctx.subscribe("alerts");
2044 ctx.schedule_once("retry", 10, "retry", Value::Null);
2045 Err(ShellyError::Event("mount failed".to_string()))
2046 }
2047
2048 fn render(&self) -> Html {
2049 Html::new("<p>mount-failure</p>")
2050 }
2051 }
2052
2053 #[test]
2054 fn mount_failure_drains_side_effects_and_keeps_session_safe() {
2055 let mut session = LiveSession::new(Box::<MountFailureView>::default(), "root");
2056 let err = session.mount().unwrap_err().to_string();
2057 assert!(err.contains("mount failed"));
2058 assert!(session.drain_pubsub_commands().is_empty());
2059 assert!(session.drain_runtime_commands().is_empty());
2060 assert_eq!(session.revision(), 0);
2061 }
2062
2063 #[derive(Default)]
2064 struct ComponentErrorView;
2065
2066 impl LiveView for ComponentErrorView {
2067 fn handle_component_event(
2068 &mut self,
2069 _target: &str,
2070 _event: Event,
2071 _ctx: &mut Context,
2072 ) -> LiveResult<Option<ComponentRender>> {
2073 Err(ShellyError::Event("component handler failed".to_string()))
2074 }
2075
2076 fn render(&self) -> Html {
2077 Html::new("<p>component</p>")
2078 }
2079 }
2080
2081 #[test]
2082 fn protocol_paths_cover_navigation_upload_and_component_error_branches() {
2083 let mut session = LiveSession::new(Box::<Counter>::default(), "root");
2084 session.mount().unwrap();
2085
2086 assert_eq!(
2087 session.handle_client_message(ClientMessage::PatchUrl {
2088 to: "/inbox".to_string()
2089 }),
2090 vec![ServerMessage::PatchUrl {
2091 to: "/inbox".to_string()
2092 }]
2093 );
2094 assert_eq!(
2095 session.handle_client_message(ClientMessage::Navigate {
2096 to: "/settings".to_string()
2097 }),
2098 vec![ServerMessage::Navigate {
2099 to: "/settings".to_string()
2100 }]
2101 );
2102
2103 for message in [
2104 ClientMessage::UploadStart {
2105 upload_id: "up-1".to_string(),
2106 event: "upload".to_string(),
2107 target: None,
2108 name: "avatar.png".to_string(),
2109 size: 12,
2110 content_type: Some("image/png".to_string()),
2111 },
2112 ClientMessage::UploadChunk {
2113 upload_id: "up-1".to_string(),
2114 offset: 0,
2115 data: "aaa".to_string(),
2116 },
2117 ClientMessage::UploadComplete {
2118 upload_id: "up-1".to_string(),
2119 },
2120 ] {
2121 let response = session.handle_client_message(message);
2122 assert!(matches!(
2123 response.as_slice(),
2124 [ServerMessage::Error { code: Some(code), .. }] if code == "unsupported_upload_transport"
2125 ));
2126 }
2127
2128 let _ = session.handle_client_message(ClientMessage::Connect {
2129 protocol: "shelly/1".to_string(),
2130 session_id: None,
2131 last_revision: None,
2132 resume_token: None,
2133 tenant_id: Some(" ".to_string()),
2134 trace_id: None,
2135 span_id: None,
2136 parent_span_id: None,
2137 correlation_id: None,
2138 request_id: None,
2139 });
2140 assert_eq!(session.tenant_id(), None);
2141
2142 let mut component_session = LiveSession::new(Box::<ComponentErrorView>::default(), "root");
2143 component_session.mount().unwrap();
2144 let component_failure = component_session.handle_client_message(ClientMessage::Event {
2145 event: "toggle".to_string(),
2146 target: Some("todo-1".to_string()),
2147 value: Value::Null,
2148 metadata: Default::default(),
2149 });
2150 assert!(matches!(
2151 component_failure.as_slice(),
2152 [ServerMessage::Error { code: Some(code), .. }] if code == "event_failed"
2153 ));
2154 }
2155
2156 #[derive(Default)]
2157 struct StreamInsertDeleteBatchView;
2158
2159 impl LiveView for StreamInsertDeleteBatchView {
2160 fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
2161 if event.name == "batch" {
2162 ctx.stream_batch(
2163 "items",
2164 vec![
2165 StreamBatchOperation::Insert {
2166 id: "item-1".to_string(),
2167 html: "<li>Item 1</li>".to_string(),
2168 at: crate::StreamPosition::Append,
2169 },
2170 StreamBatchOperation::Delete {
2171 id: "item-0".to_string(),
2172 },
2173 ],
2174 );
2175 }
2176 Ok(())
2177 }
2178
2179 fn render(&self) -> Html {
2180 Html::new("<ul id=\"items\"></ul>")
2181 }
2182 }
2183
2184 #[test]
2185 fn stream_batch_delete_telemetry_and_empty_flush_paths_are_exercised() {
2186 let telemetry = Arc::new(MemoryTelemetrySink::new());
2187 let mut session = LiveSession::new(Box::<StreamInsertDeleteBatchView>::default(), "root");
2188 session.set_telemetry_sink(telemetry.clone());
2189 session.mount().unwrap();
2190 let _ = session.render_patch();
2191
2192 let batch = session.handle_client_message(ClientMessage::Event {
2193 event: "batch".to_string(),
2194 target: None,
2195 value: Value::Null,
2196 metadata: Default::default(),
2197 });
2198 assert!(matches!(
2199 batch.as_slice(),
2200 [ServerMessage::StreamBatch { .. }]
2201 ));
2202
2203 let empty_flush = session.handle_client_message(ClientMessage::Event {
2204 event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
2205 target: None,
2206 value: Value::Null,
2207 metadata: Default::default(),
2208 });
2209 assert!(empty_flush.is_empty());
2210
2211 let telemetry_events = telemetry.events();
2212 assert!(telemetry_events.iter().any(|event| {
2213 event.kind == TelemetryEventKind::StreamInsert && event.count == Some(1)
2214 }));
2215 assert!(telemetry_events.iter().any(|event| {
2216 event.kind == TelemetryEventKind::StreamDelete && event.count == Some(1)
2217 }));
2218 }
2219}