use crate::nested::NestedLiveViewInstance;
use crate::{
ClientMessage, ComponentRender, Context, DynamicSlotPatch, Event, Html, LiveResult, LiveView,
NestedLiveViewId, NestedLiveViewSnapshot, NestedLiveViewState, NoopTelemetrySink,
ServerMessage, SessionId, SessionReplayMetadata, SessionReplayTrace, SessionTraceRecorder,
ShellyError, TelemetryEvent, TelemetryEventKind, TelemetrySink, TraceRedactionPolicy,
PROTOCOL_VERSION_V1,
};
use serde_json::{json, Value};
use std::{
collections::BTreeMap,
sync::Arc,
time::{Duration, Instant},
};
const PROTOCOL_VERSION: &str = PROTOCOL_VERSION_V1;
const INTERNAL_RENDER_FLUSH_ID: &str = "__shelly_internal_render_flush";
pub const INTERNAL_RENDER_FLUSH_EVENT: &str = "__shelly_internal_render_flush";
pub struct LiveSession {
view: Box<dyn LiveView>,
ctx: Context,
revision: u64,
last_render: Option<Html>,
telemetry: Arc<dyn TelemetrySink>,
default_render_cadence_ms: u64,
last_render_sent_at: Option<Instant>,
coalesced_render_count: usize,
coalesced_render_since: Option<Instant>,
render_flush_deadline: Option<Instant>,
trace_recorder: Option<SessionTraceRecorder>,
children: BTreeMap<NestedLiveViewId, NestedLiveViewInstance>,
}
impl LiveSession {
pub fn new(view: Box<dyn LiveView>, target_dom_id: impl Into<String>) -> Self {
Self::new_with_session_id(view, SessionId::new().to_string(), target_dom_id)
}
pub fn new_with_session_id(
view: Box<dyn LiveView>,
session_id: impl Into<String>,
target_dom_id: impl Into<String>,
) -> Self {
Self::new_with_route_and_session_id(view, session_id, target_dom_id, "/", BTreeMap::new())
}
pub fn new_with_route(
view: Box<dyn LiveView>,
target_dom_id: impl Into<String>,
route_path: impl Into<String>,
route_params: BTreeMap<String, String>,
) -> Self {
Self::new_with_route_and_session_id(
view,
SessionId::new().to_string(),
target_dom_id,
route_path,
route_params,
)
}
pub fn new_with_route_and_session_id(
view: Box<dyn LiveView>,
session_id: impl Into<String>,
target_dom_id: impl Into<String>,
route_path: impl Into<String>,
route_params: BTreeMap<String, String>,
) -> Self {
let mut ctx =
Context::new_with_session_id(SessionId::from_string(session_id.into()), target_dom_id);
ctx.set_route(route_path, route_params);
Self {
view,
ctx,
revision: 0,
last_render: None,
telemetry: Arc::new(NoopTelemetrySink),
default_render_cadence_ms: 0,
last_render_sent_at: None,
coalesced_render_count: 0,
coalesced_render_since: None,
render_flush_deadline: None,
trace_recorder: None,
children: BTreeMap::new(),
}
}
pub fn set_telemetry_sink(&mut self, telemetry: Arc<dyn TelemetrySink>) {
self.telemetry = telemetry;
}
pub fn set_default_render_cadence_ms(&mut self, cadence_ms: u64) {
self.default_render_cadence_ms = cadence_ms;
}
pub fn mount(&mut self) -> LiveResult {
let started = Instant::now();
self.ctx.set_connected(true);
let result = self
.view
.mount(&mut self.ctx)
.and_then(|_| self.view.handle_params(&mut self.ctx));
if result.is_err() {
self.ctx.set_connected(false);
let _ = self.ctx.drain_pushes();
let _ = self.ctx.drain_pubsub_commands();
let _ = self.ctx.drain_runtime_commands();
}
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Mount)
.with_ok(result.is_ok())
.with_latency_ms(started.elapsed().as_millis() as u64),
);
result
}
pub fn drain_pubsub_commands(&mut self) -> Vec<crate::PubSubCommand> {
let mut commands = self.ctx.drain_pubsub_commands();
for child in self.children.values_mut() {
commands.extend(child.ctx.drain_pubsub_commands());
}
commands
}
pub fn drain_runtime_commands(&mut self) -> Vec<crate::RuntimeCommand> {
let mut commands = self.ctx.drain_runtime_commands();
for child in self.children.values_mut() {
commands.extend(child.ctx.drain_runtime_commands());
}
commands
}
pub fn session_id(&self) -> &str {
self.ctx.session_id().as_str()
}
pub fn route_path(&self) -> &str {
self.ctx.route_path()
}
pub fn route_params(&self) -> &BTreeMap<String, String> {
self.ctx.route_params()
}
pub fn tenant_id(&self) -> Option<&str> {
self.ctx.tenant_id()
}
pub fn set_tenant_id(&mut self, tenant_id: impl Into<String>) {
self.ctx.set_tenant_id(tenant_id);
}
pub fn set_tenant_id_optional(&mut self, tenant_id: Option<String>) {
self.ctx.set_tenant_id_optional(tenant_id);
}
pub fn revision(&self) -> u64 {
self.revision
}
pub fn child_snapshots(&self) -> Vec<NestedLiveViewSnapshot> {
self.children
.iter()
.map(|(id, child)| child.snapshot(id.clone()))
.collect()
}
pub fn child_state(&self, id: impl AsRef<str>) -> Option<NestedLiveViewState> {
self.children
.get(&NestedLiveViewId::new(id.as_ref()))
.map(|child| child.state)
}
pub fn mount_child(
&mut self,
id: impl Into<NestedLiveViewId>,
view: Box<dyn LiveView>,
target_dom_id: impl Into<String>,
) -> LiveResult<ServerMessage> {
let route_path = self.route_path().to_string();
let route_params = self.route_params().clone();
self.mount_child_with_route(id, view, target_dom_id, route_path, route_params)
}
pub fn mount_child_with_route(
&mut self,
id: impl Into<NestedLiveViewId>,
view: Box<dyn LiveView>,
target_dom_id: impl Into<String>,
route_path: impl Into<String>,
route_params: BTreeMap<String, String>,
) -> LiveResult<ServerMessage> {
let id = id.into();
let target_dom_id = target_dom_id.into();
if self.children.contains_key(&id) {
return Err(ShellyError::InvalidMessage(format!(
"nested LiveView `{id}` is already mounted"
)));
}
let mut ctx = Context::new_with_session_id(
SessionId::from_string(self.session_id().to_string()),
target_dom_id,
);
ctx.set_connected(true);
ctx.set_tenant_id_optional(self.tenant_id().map(ToString::to_string));
ctx.set_route(route_path, route_params);
let mut child = NestedLiveViewInstance {
view,
ctx,
state: NestedLiveViewState::Mounted,
last_render: None,
};
child.view.mount(&mut child.ctx)?;
child.view.handle_params(&mut child.ctx)?;
self.children.insert(id.clone(), child);
let patch = self.render_child_update_by_id(&id)?;
self.emit_child_lifecycle_telemetry(&id, "mount", true);
Ok(patch)
}
pub fn update_child_params(
&mut self,
id: impl AsRef<str>,
route_path: impl Into<String>,
route_params: BTreeMap<String, String>,
) -> LiveResult<ServerMessage> {
let id = NestedLiveViewId::new(id.as_ref());
{
let child = self.active_child_mut(&id)?;
child.ctx.set_route(route_path, route_params);
child.view.handle_params(&mut child.ctx)?;
}
self.emit_child_lifecycle_telemetry(&id, "params_update", true);
self.render_child_update_by_id(&id)
}
pub fn suspend_child(&mut self, id: impl AsRef<str>) -> LiveResult {
let id = NestedLiveViewId::new(id.as_ref());
let child = self.child_mut(&id)?;
if child.state == NestedLiveViewState::Terminated {
return Err(ShellyError::InvalidMessage(format!(
"nested LiveView `{id}` is terminated"
)));
}
child.state = NestedLiveViewState::Suspended;
child.ctx.set_connected(false);
self.emit_child_lifecycle_telemetry(&id, "suspend", true);
Ok(())
}
pub fn resume_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
let id = NestedLiveViewId::new(id.as_ref());
{
let child = self.child_mut(&id)?;
if child.state == NestedLiveViewState::Terminated {
return Err(ShellyError::InvalidMessage(format!(
"nested LiveView `{id}` is terminated"
)));
}
child.state = NestedLiveViewState::Mounted;
child.ctx.set_connected(true);
}
self.emit_child_lifecycle_telemetry(&id, "resume", true);
self.render_child_update_by_id(&id)
}
pub fn terminate_child(&mut self, id: impl AsRef<str>) -> LiveResult<ServerMessage> {
let id = NestedLiveViewId::new(id.as_ref());
let target = {
let child = self.child_mut(&id)?;
child.state = NestedLiveViewState::Terminated;
child.ctx.set_connected(false);
child.ctx.target_dom_id().to_string()
};
self.revision += 1;
let message = ServerMessage::Patch {
target,
html: String::new(),
revision: self.revision,
};
self.emit_child_lifecycle_telemetry(&id, "terminate", true);
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(0)
.with_count(1)
.with_attribute("nested_live_view".to_string(), Value::Bool(true))
.with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
);
Ok(message)
}
pub fn enable_trace_capture(&mut self, policy: TraceRedactionPolicy) {
self.trace_recorder = Some(SessionTraceRecorder::new(
SessionReplayMetadata {
protocol: PROTOCOL_VERSION.to_string(),
session_id: self.session_id().to_string(),
tenant_id: self.tenant_id().map(ToString::to_string),
target_id: self.ctx.target_dom_id().to_string(),
route_path: self.route_path().to_string(),
route_params: self.route_params().clone(),
},
policy,
));
}
pub fn disable_trace_capture(&mut self) {
self.trace_recorder = None;
}
pub fn trace_capture_enabled(&self) -> bool {
self.trace_recorder.is_some()
}
pub fn trace_artifact(&self) -> Option<SessionReplayTrace> {
self.trace_recorder
.as_ref()
.map(SessionTraceRecorder::artifact)
}
pub fn take_trace_artifact(&mut self) -> Option<SessionReplayTrace> {
self.trace_recorder
.take()
.map(SessionTraceRecorder::into_artifact)
}
fn emit_telemetry(&self, event: TelemetryEvent) {
let _ = self.telemetry.emit(event);
}
fn telemetry_event(&self, kind: TelemetryEventKind) -> TelemetryEvent {
let mut event = TelemetryEvent::new(kind)
.with_session(self.session_id().to_string())
.with_route(self.ctx.route_path().to_string());
if let Some(tenant_id) = self.tenant_id() {
event = event.with_attribute(
"tenant_id".to_string(),
Value::String(tenant_id.to_string()),
);
}
event
}
fn effective_render_cadence_ms(&self) -> u64 {
self.ctx
.render_cadence_override_ms()
.unwrap_or(self.default_render_cadence_ms)
}
fn reset_coalesced_render_state(&mut self) {
self.coalesced_render_count = 0;
self.coalesced_render_since = None;
self.render_flush_deadline = None;
}
fn mark_render_sent_now(&mut self) {
if self.coalesced_render_count > 0 {
self.ctx.cancel_schedule_internal(INTERNAL_RENDER_FLUSH_ID);
}
self.last_render_sent_at = Some(Instant::now());
self.reset_coalesced_render_state();
}
fn emit_render_cadence_telemetry(
&self,
phase: &'static str,
cadence_ms: u64,
coalesced_count: usize,
latency_ms: Option<u64>,
) {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::RenderCadence)
.with_ok(true)
.with_count(coalesced_count.max(1))
.with_latency_ms(latency_ms.unwrap_or(0))
.with_attribute("render_phase".to_string(), Value::String(phase.to_string()))
.with_attribute(
"render_cadence_ms".to_string(),
Value::Number(serde_json::Number::from(cadence_ms)),
)
.with_attribute(
"coalesced_count".to_string(),
Value::Number(serde_json::Number::from(coalesced_count)),
),
);
}
fn schedule_render_flush(&mut self, delay: Duration) {
self.ctx.schedule_internal_once(
INTERNAL_RENDER_FLUSH_ID,
delay.as_millis() as u64,
INTERNAL_RENDER_FLUSH_EVENT,
json!({"internal": true}),
);
}
fn child_mut(&mut self, id: &NestedLiveViewId) -> LiveResult<&mut NestedLiveViewInstance> {
self.children.get_mut(id).ok_or_else(|| {
ShellyError::InvalidMessage(format!("nested LiveView `{id}` is not mounted"))
})
}
fn active_child_mut(
&mut self,
id: &NestedLiveViewId,
) -> LiveResult<&mut NestedLiveViewInstance> {
let child = self.child_mut(id)?;
match child.state {
NestedLiveViewState::Mounted => Ok(child),
NestedLiveViewState::Suspended => Err(ShellyError::InvalidMessage(format!(
"nested LiveView `{id}` is suspended"
))),
NestedLiveViewState::Terminated => Err(ShellyError::InvalidMessage(format!(
"nested LiveView `{id}` is terminated"
))),
}
}
fn child_id_for_target(&self, target: &str) -> Option<NestedLiveViewId> {
let id = NestedLiveViewId::new(target);
if self.children.contains_key(&id) {
return Some(id);
}
self.children
.iter()
.find_map(|(id, child)| (child.ctx.target_dom_id() == target).then(|| id.clone()))
}
fn emit_child_lifecycle_telemetry(
&self,
id: &NestedLiveViewId,
lifecycle: &'static str,
ok: bool,
) {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Mount)
.with_ok(ok)
.with_attribute("nested_live_view".to_string(), Value::Bool(true))
.with_attribute(
"nested_lifecycle".to_string(),
Value::String(lifecycle.to_string()),
)
.with_attribute("nested_child_id".to_string(), Value::String(id.to_string())),
);
}
fn render_child_update_by_id(&mut self, id: &NestedLiveViewId) -> LiveResult<ServerMessage> {
let (target, next, previous) = {
let child = self.active_child_mut(id)?;
let next = child.view.render();
let previous = child.last_render.clone();
child.last_render = Some(next.clone());
let _ = child.ctx.take_render_after_event();
(child.ctx.target_dom_id().to_string(), next, previous)
};
self.revision += 1;
let diff = diff_template(previous.as_ref(), &next);
let message = match diff {
Some(slots) => ServerMessage::Diff {
target,
revision: self.revision,
slots,
},
None => ServerMessage::Patch {
target,
html: next.as_str().to_string(),
revision: self.revision,
},
};
match &message {
ServerMessage::Diff { slots, .. } => {
let bytes = slots.iter().map(|slot| slot.html.len()).sum();
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Diff)
.with_bytes(bytes)
.with_count(slots.len())
.with_attribute("nested_live_view".to_string(), Value::Bool(true))
.with_attribute(
"nested_child_id".to_string(),
Value::String(id.to_string()),
),
);
}
ServerMessage::Patch { html, .. } => {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(html.len())
.with_count(1)
.with_attribute("nested_live_view".to_string(), Value::Bool(true))
.with_attribute(
"nested_child_id".to_string(),
Value::String(id.to_string()),
),
);
}
_ => {}
}
Ok(message)
}
fn handle_child_event(&mut self, id: NestedLiveViewId, mut event: Event) -> Vec<ServerMessage> {
event.target = Some(id.to_string());
let result = {
let child = match self.active_child_mut(&id) {
Ok(child) => child,
Err(err) => {
return vec![ServerMessage::Error {
message: err.to_string(),
code: Some("nested_live_view_unavailable".to_string()),
}];
}
};
child.view.handle_event(event, &mut child.ctx)
};
if let Err(err) = result {
if let Ok(child) = self.child_mut(&id) {
let _ = child.ctx.drain_pushes();
let _ = child.ctx.drain_pubsub_commands();
let _ = child.ctx.drain_runtime_commands();
let _ = child.ctx.take_render_after_event();
}
return vec![ServerMessage::Error {
message: err.to_string(),
code: Some("nested_event_failed".to_string()),
}];
}
let (mut messages, render) = match self.child_mut(&id) {
Ok(child) => (
child.ctx.drain_pushes(),
child.ctx.take_render_after_event(),
),
Err(err) => {
return vec![ServerMessage::Error {
message: err.to_string(),
code: Some("nested_live_view_unavailable".to_string()),
}];
}
};
if render {
match self.render_child_update_by_id(&id) {
Ok(message) => messages.push(message),
Err(err) => messages.push(ServerMessage::Error {
message: err.to_string(),
code: Some("nested_event_failed".to_string()),
}),
}
}
messages
}
fn coalesce_or_render_root_update(&mut self, messages: &mut Vec<ServerMessage>) {
let cadence_ms = self.effective_render_cadence_ms();
if cadence_ms == 0 {
if self.coalesced_render_count > 0 {
let since = self.coalesced_render_since;
self.emit_render_cadence_telemetry(
"flush_immediate_override",
cadence_ms,
self.coalesced_render_count,
since.map(|started| started.elapsed().as_millis() as u64),
);
}
messages.push(self.render_update());
return;
}
let now = Instant::now();
let elapsed_since_render = self
.last_render_sent_at
.map(|at| now.saturating_duration_since(at));
let can_render_now = match elapsed_since_render {
None => true,
Some(elapsed) => elapsed.as_millis() as u64 >= cadence_ms,
};
if can_render_now {
if self.coalesced_render_count > 0 {
let since = self.coalesced_render_since;
self.emit_render_cadence_telemetry(
"flush",
cadence_ms,
self.coalesced_render_count,
since.map(|started| started.elapsed().as_millis() as u64),
);
} else {
self.emit_render_cadence_telemetry("immediate", cadence_ms, 0, Some(0));
}
messages.push(self.render_update());
return;
}
let elapsed_ms = elapsed_since_render
.map(|elapsed| elapsed.as_millis() as u64)
.unwrap_or(0);
let remaining_ms = cadence_ms.saturating_sub(elapsed_ms).max(1);
self.coalesced_render_count += 1;
if self.coalesced_render_since.is_none() {
self.coalesced_render_since = Some(now);
}
self.emit_render_cadence_telemetry(
"coalesced",
cadence_ms,
self.coalesced_render_count,
Some(remaining_ms),
);
let deadline = now + Duration::from_millis(remaining_ms);
let should_schedule = match self.render_flush_deadline {
Some(existing_deadline) => deadline < existing_deadline,
None => true,
};
if should_schedule {
self.render_flush_deadline = Some(deadline);
self.schedule_render_flush(Duration::from_millis(remaining_ms));
}
}
fn handle_internal_render_flush_event(&mut self) -> Vec<ServerMessage> {
if self.coalesced_render_count == 0 {
return Vec::new();
}
self.render_flush_deadline = None;
let mut messages = Vec::new();
self.coalesce_or_render_root_update(&mut messages);
messages
}
pub fn render_html(&self) -> Html {
self.view.render()
}
pub fn patch_route(
&mut self,
route_path: impl Into<String>,
route_params: BTreeMap<String, String>,
) -> LiveResult {
self.ctx.set_route(route_path, route_params);
self.view.handle_params(&mut self.ctx)
}
pub fn hello(&self) -> ServerMessage {
ServerMessage::Hello {
session_id: self.session_id().to_string(),
target: self.ctx.target_dom_id().to_string(),
revision: self.revision,
protocol: PROTOCOL_VERSION.to_string(),
server_revision: None,
resume_status: None,
resume_reason: None,
resume_token: None,
resume_expires_in_ms: None,
}
}
pub fn render_patch(&mut self) -> ServerMessage {
self.revision += 1;
let html = self.view.render();
let html_string = html.as_str().to_string();
let bytes = html_string.len();
self.last_render = Some(html);
let message = ServerMessage::Patch {
target: self.ctx.target_dom_id().to_string(),
html: html_string,
revision: self.revision,
};
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(bytes)
.with_count(1),
);
self.mark_render_sent_now();
message
}
pub fn render_update(&mut self) -> ServerMessage {
let next = self.view.render();
let diff = self.diff_against_last_render(&next);
self.revision += 1;
let message = match diff {
Some(slots) => ServerMessage::Diff {
target: self.ctx.target_dom_id().to_string(),
revision: self.revision,
slots,
},
None => ServerMessage::Patch {
target: self.ctx.target_dom_id().to_string(),
html: next.as_str().to_string(),
revision: self.revision,
},
};
match &message {
ServerMessage::Diff { slots, .. } => {
let bytes = slots.iter().map(|slot| slot.html.len()).sum();
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Diff)
.with_bytes(bytes)
.with_count(slots.len()),
);
}
ServerMessage::Patch { html, .. } => {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(html.len())
.with_count(1),
);
}
_ => {}
}
self.last_render = Some(next);
self.mark_render_sent_now();
message
}
pub fn render_snapshot_patch(&mut self) -> ServerMessage {
let html = self.view.render();
let html_string = html.as_str().to_string();
self.last_render = Some(html);
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(html_string.len())
.with_count(1)
.with_attribute("snapshot".to_string(), Value::Bool(true)),
);
let message = ServerMessage::Patch {
target: self.ctx.target_dom_id().to_string(),
html: html_string,
revision: self.revision,
};
self.mark_render_sent_now();
message
}
pub fn render_component_update(&mut self, render: ComponentRender) -> ServerMessage {
self.revision += 1;
let (id, html) = render.into_parts();
let message = ServerMessage::Patch {
target: id.to_string(),
html: html.as_str().to_string(),
revision: self.revision,
};
if let ServerMessage::Patch { html, .. } = &message {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Patch)
.with_bytes(html.len())
.with_count(1)
.with_attribute("component_patch".to_string(), Value::Bool(true)),
);
}
message
}
pub fn handle_client_message(&mut self, message: ClientMessage) -> Vec<ServerMessage> {
let started = Instant::now();
let revision_before = self.revision;
let captured_message = message.clone();
if let Some(tenant_id) = tenant_id_for_client_message(&message) {
self.set_tenant_id(tenant_id);
}
let event_name = match &message {
ClientMessage::Event { event, .. } if event != INTERNAL_RENDER_FLUSH_EVENT => {
Some(event.clone())
}
_ => None,
};
let messages = match message {
ClientMessage::Connect { protocol, .. } => {
if protocol == PROTOCOL_VERSION {
Vec::new()
} else {
vec![ServerMessage::Error {
message: format!(
"unsupported protocol in connect: expected {PROTOCOL_VERSION}, got {protocol}"
),
code: Some("unsupported_protocol".to_string()),
}]
}
}
ClientMessage::Ping { nonce } => vec![ServerMessage::Pong { nonce }],
ClientMessage::PatchUrl { to } => vec![ServerMessage::PatchUrl { to }],
ClientMessage::Navigate { to } => vec![ServerMessage::Navigate { to }],
ClientMessage::UploadStart { .. }
| ClientMessage::UploadChunk { .. }
| ClientMessage::UploadComplete { .. } => vec![ServerMessage::Error {
message: "upload protocol messages must be handled by a transport adapter"
.to_string(),
code: Some("unsupported_upload_transport".to_string()),
}],
event_message @ ClientMessage::Event { .. } => {
let is_internal_flush = matches!(
&event_message,
ClientMessage::Event { event, target, .. }
if event == INTERNAL_RENDER_FLUSH_EVENT && target.is_none()
);
if is_internal_flush {
self.handle_internal_render_flush_event()
} else {
match Event::try_from(event_message) {
Err(err) => vec![ServerMessage::Error {
message: err.to_string(),
code: Some("invalid_event".to_string()),
}],
Ok(event) => {
if let Some(target) = event.target.clone() {
if let Some(child_id) = self.child_id_for_target(&target) {
self.handle_child_event(child_id, event)
} else {
match self.view.handle_component_event(
&target,
event.clone(),
&mut self.ctx,
) {
Ok(Some(render)) => {
let mut messages = self.ctx.drain_pushes();
if self.ctx.take_render_after_event() {
messages.push(self.render_component_update(render));
}
messages
}
Ok(None) => {
if let Err(err) =
self.view.handle_event(event, &mut self.ctx)
{
let _ = self.ctx.drain_pushes();
let _ = self.ctx.drain_pubsub_commands();
let _ = self.ctx.drain_runtime_commands();
let _ = self.ctx.take_render_after_event();
vec![ServerMessage::Error {
message: err.to_string(),
code: Some("event_failed".to_string()),
}]
} else {
let mut messages = self.ctx.drain_pushes();
if self.ctx.take_render_after_event() {
self.coalesce_or_render_root_update(
&mut messages,
);
}
messages
}
}
Err(err) => {
let _ = self.ctx.drain_pushes();
let _ = self.ctx.drain_pubsub_commands();
let _ = self.ctx.drain_runtime_commands();
let _ = self.ctx.take_render_after_event();
vec![ServerMessage::Error {
message: err.to_string(),
code: Some("event_failed".to_string()),
}]
}
}
}
} else if let Err(err) = self.view.handle_event(event, &mut self.ctx) {
let _ = self.ctx.drain_pushes();
let _ = self.ctx.drain_pubsub_commands();
let _ = self.ctx.drain_runtime_commands();
let _ = self.ctx.take_render_after_event();
vec![ServerMessage::Error {
message: err.to_string(),
code: Some("event_failed".to_string()),
}]
} else {
let mut messages = self.ctx.drain_pushes();
if self.ctx.take_render_after_event() {
self.coalesce_or_render_root_update(&mut messages);
}
messages
}
}
}
}
}
};
for message in &messages {
match message {
ServerMessage::StreamInsert { .. } => {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::StreamInsert)
.with_count(1),
);
}
ServerMessage::StreamDelete { .. } => {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::StreamDelete)
.with_count(1),
);
}
ServerMessage::StreamBatch { operations, .. } => {
let inserts = operations
.iter()
.filter(|operation| {
matches!(operation, crate::StreamBatchOperation::Insert { .. })
})
.count();
let deletes = operations
.iter()
.filter(|operation| {
matches!(operation, crate::StreamBatchOperation::Delete { .. })
})
.count();
if inserts > 0 {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::StreamInsert)
.with_count(inserts),
);
}
if deletes > 0 {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::StreamDelete)
.with_count(deletes),
);
}
}
ServerMessage::Error { .. } => {
self.emit_telemetry(
self.telemetry_event(TelemetryEventKind::Error)
.with_ok(false)
.with_count(1),
);
}
_ => {}
}
}
let ok = messages
.iter()
.all(|message| !matches!(message, ServerMessage::Error { .. }));
let mut event_telemetry = self
.telemetry_event(TelemetryEventKind::HandleEvent)
.with_ok(ok)
.with_latency_ms(started.elapsed().as_millis() as u64)
.with_count(messages.len());
if let Some(name) = event_name {
event_telemetry = event_telemetry.with_event_name(name);
}
self.emit_telemetry(event_telemetry);
if let Some(recorder) = self.trace_recorder.as_mut() {
recorder.record_turn(&captured_message, &messages, revision_before, self.revision);
}
messages
}
fn diff_against_last_render(&self, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
diff_template(self.last_render.as_ref(), next)
}
}
fn diff_template(previous: Option<&Html>, next: &Html) -> Option<Vec<DynamicSlotPatch>> {
let previous = previous?.template()?;
let next_template = next.template()?;
if !previous.compatible_with(next_template) {
return None;
}
Some(
previous
.dynamic_segments()
.iter()
.zip(next_template.dynamic_segments())
.enumerate()
.filter_map(|(index, (before, after))| {
if before == after {
None
} else {
Some(DynamicSlotPatch {
index,
html: after.clone(),
})
}
})
.collect(),
)
}
fn tenant_id_for_client_message(message: &ClientMessage) -> Option<String> {
match message {
ClientMessage::Connect { tenant_id, .. } => normalize_tenant_id(tenant_id.as_deref()),
ClientMessage::Event {
metadata, value, ..
} => metadata
.get("tenant_id")
.and_then(Value::as_str)
.or_else(|| value.get("tenant_id").and_then(Value::as_str))
.and_then(|value| normalize_tenant_id(Some(value))),
_ => None,
}
}
fn normalize_tenant_id(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
}
impl Drop for LiveSession {
fn drop(&mut self) {
self.ctx.set_connected(false);
}
}
#[cfg(test)]
mod tests {
use super::{LiveSession, INTERNAL_RENDER_FLUSH_EVENT};
use crate::{
ClientMessage, ComponentId, ComponentRender, Context, DynamicSlotPatch, Event, Html,
LiveComponent, LiveResult, LiveView, MemoryTelemetrySink, NestedLiveViewState,
RuntimeCommand, ServerMessage, ShellyError, StreamBatchOperation, TelemetryEvent,
TelemetryEventKind, TelemetrySink, Template, TraceRedactionPolicy,
};
use serde_json::{json, Value};
use std::{collections::BTreeMap, sync::Arc, thread, time::Duration};
#[derive(Default)]
struct Counter {
count: i64,
}
impl LiveView for Counter {
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"inc" => self.count += 1,
"dec" => self.count -= 1,
_ => {}
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!("<p>{}</p>", self.count))
}
}
#[test]
fn session_dispatches_event_and_patches() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
let messages = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(messages.len(), 1);
assert_eq!(
messages[0],
ServerMessage::Patch {
target: "root".to_string(),
html: "<p>1</p>".to_string(),
revision: 1,
}
);
}
#[test]
fn session_responds_to_ping_without_rendering() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
let messages = session.handle_client_message(ClientMessage::Ping {
nonce: Some("abc".to_string()),
});
assert_eq!(
messages,
vec![ServerMessage::Pong {
nonce: Some("abc".to_string())
}]
);
assert_eq!(session.revision(), 0);
}
#[test]
fn connect_with_supported_protocol_is_noop() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
let messages = session.handle_client_message(ClientMessage::Connect {
protocol: "shelly/1".to_string(),
session_id: None,
last_revision: None,
resume_token: None,
tenant_id: None,
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
});
assert!(messages.is_empty());
}
#[test]
fn connect_with_unsupported_protocol_returns_structured_error() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
let messages = session.handle_client_message(ClientMessage::Connect {
protocol: "shelly/2".to_string(),
session_id: None,
last_revision: None,
resume_token: None,
tenant_id: None,
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
});
assert_eq!(
messages,
vec![ServerMessage::Error {
message: "unsupported protocol in connect: expected shelly/1, got shelly/2"
.to_string(),
code: Some("unsupported_protocol".to_string()),
}]
);
}
#[test]
fn connect_and_event_propagate_tenant_context() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
let _ = session.handle_client_message(ClientMessage::Connect {
protocol: "shelly/1".to_string(),
session_id: None,
last_revision: None,
resume_token: None,
tenant_id: Some("tenant-a".to_string()),
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
});
assert_eq!(session.tenant_id(), Some("tenant-a"));
let _ = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: json!({"tenant_id": "tenant-b"}),
metadata: Default::default(),
});
assert_eq!(session.tenant_id(), Some("tenant-b"));
}
#[test]
fn unknown_events_do_not_panic() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
let messages = session.handle_client_message(ClientMessage::Event {
event: "unknown".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
messages,
vec![ServerMessage::Patch {
target: "root".to_string(),
html: "<p>0</p>".to_string(),
revision: 1,
}]
);
}
#[derive(Default)]
struct FailingEvent {
count: i64,
}
impl LiveView for FailingEvent {
fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"fail" => {
ctx.redirect("/should-not-leak");
Err(ShellyError::Event("boom".to_string()))
}
"inc" => {
self.count += 1;
Ok(())
}
_ => Ok(()),
}
}
fn render(&self) -> Html {
Html::new(format!("<p>{}</p>", self.count))
}
}
#[test]
fn event_errors_do_not_increment_revision_or_leak_pushes() {
let mut session = LiveSession::new(Box::<FailingEvent>::default(), "root");
session.mount().unwrap();
let failure = session.handle_client_message(ClientMessage::Event {
event: "fail".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
failure,
vec![ServerMessage::Error {
message: "event error: boom".to_string(),
code: Some("event_failed".to_string()),
}]
);
assert_eq!(session.revision(), 0);
let success = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
success,
vec![ServerMessage::Patch {
target: "root".to_string(),
html: "<p>1</p>".to_string(),
revision: 1,
}]
);
}
#[derive(Default)]
struct TemplateCounter {
count: i64,
}
impl LiveView for TemplateCounter {
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
if event.name == "inc" {
self.count += 1;
}
Ok(())
}
fn render(&self) -> Html {
Template::new(vec!["<p>Count: ", "</p>"], vec![self.count.to_string()]).into()
}
}
#[test]
fn template_updates_send_only_changed_dynamic_slots() {
let mut session = LiveSession::new(Box::<TemplateCounter>::default(), "root");
session.mount().unwrap();
assert_eq!(
session.render_patch(),
ServerMessage::Patch {
target: "root".to_string(),
html: r#"<p>Count: <span data-shelly-slot="0">0</span></p>"#.to_string(),
revision: 1,
}
);
let update = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
update,
vec![ServerMessage::Diff {
target: "root".to_string(),
revision: 2,
slots: vec![DynamicSlotPatch {
index: 0,
html: "1".to_string(),
}],
}]
);
}
#[test]
fn plain_html_keeps_full_patch_fallback() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
assert!(matches!(
session.render_patch(),
ServerMessage::Patch { .. }
));
let update = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(update.as_slice(), [ServerMessage::Patch { .. }]));
}
#[derive(Default)]
struct StreamList {
next_id: usize,
}
impl LiveView for StreamList {
fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"append" => {
self.next_id += 1;
let id = format!("item-{}", self.next_id);
ctx.stream_append(
"items",
id.clone(),
format!(r#"<li id="{id}">Item {}</li>"#, self.next_id),
);
}
"delete" => ctx.stream_delete("items", "item-1"),
_ => {}
}
Ok(())
}
fn render(&self) -> Html {
Html::new(r#"<ul id="items"></ul>"#)
}
}
#[test]
fn stream_events_send_only_stream_operations() {
let mut session = LiveSession::new(Box::<StreamList>::default(), "root");
session.mount().unwrap();
session.render_patch();
let append = session.handle_client_message(ClientMessage::Event {
event: "append".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
append,
vec![ServerMessage::StreamInsert {
target: "items".to_string(),
id: "item-1".to_string(),
html: r#"<li id="item-1">Item 1</li>"#.to_string(),
at: crate::StreamPosition::Append,
}]
);
let delete = session.handle_client_message(ClientMessage::Event {
event: "delete".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
delete,
vec![ServerMessage::StreamDelete {
target: "items".to_string(),
id: "item-1".to_string(),
}]
);
}
#[derive(Default)]
struct StreamBatchList {
next_id: usize,
}
impl LiveView for StreamBatchList {
fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
if event.name == "burst_append" {
let mut items = Vec::new();
for _ in 0..3 {
self.next_id += 1;
let id = format!("item-{}", self.next_id);
let html = format!(r#"<li id="{id}">Item {}</li>"#, self.next_id);
items.push((id, html));
}
ctx.stream_append_many("items", items);
}
Ok(())
}
fn render(&self) -> Html {
Html::new(r#"<ul id="items"></ul>"#)
}
}
#[test]
fn stream_burst_events_send_batch_operations() {
let mut session = LiveSession::new(Box::<StreamBatchList>::default(), "root");
session.mount().unwrap();
session.render_patch();
let messages = session.handle_client_message(ClientMessage::Event {
event: "burst_append".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(messages.len(), 1);
assert_eq!(
messages[0],
ServerMessage::StreamBatch {
target: "items".to_string(),
operations: vec![
StreamBatchOperation::Insert {
id: "item-1".to_string(),
html: r#"<li id="item-1">Item 1</li>"#.to_string(),
at: crate::StreamPosition::Append,
},
StreamBatchOperation::Insert {
id: "item-2".to_string(),
html: r#"<li id="item-2">Item 2</li>"#.to_string(),
at: crate::StreamPosition::Append,
},
StreamBatchOperation::Insert {
id: "item-3".to_string(),
html: r#"<li id="item-3">Item 3</li>"#.to_string(),
at: crate::StreamPosition::Append,
},
],
}
);
}
#[derive(Default)]
struct RuntimeScheduler;
impl LiveView for RuntimeScheduler {
fn mount(&mut self, ctx: &mut Context) -> LiveResult {
ctx.schedule_interval("heartbeat", 1000, "tick", Value::Null);
Ok(())
}
fn render(&self) -> Html {
Html::new("<p>scheduler</p>")
}
}
#[test]
fn mount_can_queue_runtime_commands() {
let mut session = LiveSession::new(Box::<RuntimeScheduler>::default(), "root");
session.mount().unwrap();
assert_eq!(
session.drain_runtime_commands(),
vec![RuntimeCommand::ScheduleInterval {
id: "heartbeat".to_string(),
every_ms: 1000,
dispatch: crate::RuntimeEvent::new("tick", Value::Null),
}]
);
}
#[derive(Default)]
struct CadencedCounter {
count: i64,
}
impl LiveView for CadencedCounter {
fn mount(&mut self, ctx: &mut Context) -> LiveResult {
ctx.set_render_cadence_ms(25);
Ok(())
}
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
if event.name == "inc" {
self.count += 1;
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!("<p>{}</p>", self.count))
}
}
#[test]
fn render_cadence_coalesces_bursty_root_patches() {
let mut session = LiveSession::new(Box::<CadencedCounter>::default(), "root");
session.mount().unwrap();
let first = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
let second = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(second.is_empty());
let runtime = session.drain_runtime_commands();
assert!(runtime.iter().any(|command| matches!(
command,
RuntimeCommand::ScheduleOnce { dispatch, .. }
if dispatch.event == INTERNAL_RENDER_FLUSH_EVENT
)));
thread::sleep(Duration::from_millis(30));
let flush = session.handle_client_message(ClientMessage::Event {
event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
flush.as_slice(),
[ServerMessage::Patch { html, revision, .. }]
if html == "<p>2</p>" && *revision == 2
));
}
#[derive(Default)]
struct CadenceOverrideCounter {
count: i64,
}
impl LiveView for CadenceOverrideCounter {
fn mount(&mut self, ctx: &mut Context) -> LiveResult {
ctx.set_render_cadence_ms(100);
Ok(())
}
fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"inc" => self.count += 1,
"disable_cadence" => {
ctx.clear_render_cadence_override();
self.count += 1;
}
_ => {}
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!("<p>{}</p>", self.count))
}
}
#[test]
fn per_view_override_can_disable_render_cadence() {
let mut session = LiveSession::new(Box::<CadenceOverrideCounter>::default(), "root");
session.mount().unwrap();
let first = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(first.as_slice(), [ServerMessage::Patch { .. }]));
let coalesced = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(coalesced.is_empty());
let disabled = session.handle_client_message(ClientMessage::Event {
event: "disable_cadence".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
disabled.as_slice(),
[ServerMessage::Patch { html, revision, .. }]
if html == "<p>3</p>" && *revision == 2
));
}
struct FailingTelemetrySink;
impl TelemetrySink for FailingTelemetrySink {
fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
Err("telemetry downstream unavailable".to_string())
}
}
#[test]
fn telemetry_sink_failures_do_not_break_runtime_flow() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.set_telemetry_sink(Arc::new(FailingTelemetrySink));
session.mount().unwrap();
let messages = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(messages.as_slice(), [ServerMessage::Patch { .. }]));
}
struct TodoItem {
id: ComponentId,
title: String,
done: bool,
}
impl TodoItem {
fn new(id: &str, title: &str) -> Self {
Self {
id: ComponentId::new(id),
title: title.to_string(),
done: false,
}
}
}
impl LiveComponent for TodoItem {
fn id(&self) -> ComponentId {
self.id.clone()
}
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
if event.name == "toggle" {
self.done = !self.done;
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!(
r#"<li id="{id}" data-done="{done}"><span>{title}</span><button shelly-click="toggle" shelly-target="{id}">Toggle</button></li>"#,
id = self.id,
done = self.done,
title = self.title,
))
}
}
struct TodoList {
items: Vec<TodoItem>,
}
impl Default for TodoList {
fn default() -> Self {
Self {
items: vec![
TodoItem::new("todo-1", "Write docs"),
TodoItem::new("todo-2", "Ship components"),
],
}
}
}
impl LiveView for TodoList {
fn handle_component_event(
&mut self,
target: &str,
event: Event,
ctx: &mut Context,
) -> LiveResult<Option<ComponentRender>> {
let Some(item) = self
.items
.iter_mut()
.find(|item| item.id().as_str() == target)
else {
return Ok(None);
};
item.handle_event(event, ctx)?;
Ok(Some(ComponentRender::new(item.id(), item.render())))
}
fn render(&self) -> Html {
let items = self
.items
.iter()
.map(|item| item.render().into_string())
.collect::<String>();
Html::new(format!(r#"<ul>{items}</ul>"#))
}
}
#[test]
fn scoped_component_event_patches_only_target_component() {
let mut session = LiveSession::new(Box::<TodoList>::default(), "root");
session.mount().unwrap();
let initial = session.render_patch();
assert!(matches!(
initial,
ServerMessage::Patch { ref html, .. } if html.contains("todo-1")
));
let update = session.handle_client_message(ClientMessage::Event {
event: "toggle".to_string(),
target: Some("todo-2".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(update.len(), 1);
assert_eq!(
update[0],
ServerMessage::Patch {
target: "todo-2".to_string(),
html: r#"<li id="todo-2" data-done="true"><span>Ship components</span><button shelly-click="toggle" shelly-target="todo-2">Toggle</button></li>"#.to_string(),
revision: 2,
}
);
}
#[derive(Default)]
struct ChildCounter {
count: i64,
}
impl LiveView for ChildCounter {
fn mount(&mut self, ctx: &mut Context) -> LiveResult {
ctx.schedule_once_to("child-tick", 10, "child-a", "tick", Value::Null);
Ok(())
}
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"inc" => self.count += 1,
"fail" => return Err(ShellyError::Event("child boom".to_string())),
_ => {}
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!(
r#"<section id="child-a">child:{}</section>"#,
self.count
))
}
}
#[test]
fn nested_live_view_mounts_and_handles_scoped_events_without_parent_remount() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
let child_patch = session
.mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
.expect("child should mount");
assert_eq!(
child_patch,
ServerMessage::Patch {
target: "child-a".to_string(),
html: r#"<section id="child-a">child:0</section>"#.to_string(),
revision: 1,
}
);
assert_eq!(
session.child_state("child-a"),
Some(NestedLiveViewState::Mounted)
);
assert_eq!(
session.drain_runtime_commands(),
vec![RuntimeCommand::ScheduleOnce {
id: "child-tick".to_string(),
delay_ms: 10,
dispatch: crate::RuntimeEvent::with_target("child-a", "tick", Value::Null),
}]
);
let update = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: Some("child-a".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert_eq!(
update,
vec![ServerMessage::Patch {
target: "child-a".to_string(),
html: r#"<section id="child-a">child:1</section>"#.to_string(),
revision: 2,
}]
);
let parent_snapshot = session.render_snapshot_patch();
assert!(matches!(
parent_snapshot,
ServerMessage::Patch { html, revision, .. }
if html == "<p>0</p>" && revision == 2
));
}
#[test]
fn nested_live_view_lifecycle_suspend_resume_and_terminate_are_isolated() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
let _ = session
.mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
.expect("child should mount");
session.suspend_child("child-a").expect("suspend child");
assert_eq!(
session.child_state("child-a"),
Some(NestedLiveViewState::Suspended)
);
let blocked = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: Some("child-a".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
blocked.as_slice(),
[ServerMessage::Error { code: Some(code), .. }]
if code == "nested_live_view_unavailable"
));
assert_eq!(session.revision(), 1);
let resumed = session.resume_child("child-a").expect("resume child");
assert!(matches!(
resumed,
ServerMessage::Patch { target, revision, .. }
if target == "child-a" && revision == 2
));
let terminated = session.terminate_child("child-a").expect("terminate child");
assert_eq!(
terminated,
ServerMessage::Patch {
target: "child-a".to_string(),
html: String::new(),
revision: 3,
}
);
assert_eq!(
session.child_state("child-a"),
Some(NestedLiveViewState::Terminated)
);
}
#[test]
fn nested_live_view_event_failure_does_not_corrupt_parent_or_siblings() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
let _ = session
.mount_child("child-a", Box::<ChildCounter>::default(), "child-a")
.expect("child a should mount");
let _ = session
.mount_child("child-b", Box::<ChildCounter>::default(), "child-b")
.expect("child b should mount");
let failure = session.handle_client_message(ClientMessage::Event {
event: "fail".to_string(),
target: Some("child-a".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
failure.as_slice(),
[ServerMessage::Error { code: Some(code), .. }]
if code == "nested_event_failed"
));
assert_eq!(session.revision(), 2);
let sibling = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: Some("child-b".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
sibling.as_slice(),
[ServerMessage::Patch { target, revision, .. }]
if target == "child-b" && *revision == 3
));
let parent = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
parent.as_slice(),
[ServerMessage::Patch { target, html, revision }]
if target == "root" && html == "<p>1</p>" && *revision == 4
));
}
#[test]
fn session_emits_mount_and_event_telemetry() {
let telemetry = Arc::new(MemoryTelemetrySink::new());
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.set_telemetry_sink(telemetry.clone());
session.mount().unwrap();
let _ = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
let events = telemetry.events();
assert!(events
.iter()
.any(|event| event.kind == TelemetryEventKind::Mount));
assert!(events
.iter()
.any(|event| event.kind == TelemetryEventKind::HandleEvent));
assert!(events
.iter()
.any(|event| event.kind == TelemetryEventKind::Patch));
}
#[test]
fn session_public_helpers_cover_route_hello_snapshot_and_trace_paths() {
let mut route_params = BTreeMap::new();
route_params.insert("team".to_string(), "core".to_string());
let mut session = LiveSession::new_with_route(
Box::<Counter>::default(),
"root",
"/teams/core",
route_params.clone(),
);
assert_eq!(session.route_path(), "/teams/core");
assert_eq!(session.route_params(), &route_params);
assert_eq!(session.render_html().as_str(), "<p>0</p>");
session.set_default_render_cadence_ms(10);
let hello = session.hello();
assert!(matches!(
hello,
ServerMessage::Hello { ref target, revision, .. } if target == "root" && revision == 0
));
let mut patched_params = BTreeMap::new();
patched_params.insert("team".to_string(), "platform".to_string());
session
.patch_route("/teams/platform", patched_params.clone())
.unwrap();
assert_eq!(session.route_path(), "/teams/platform");
assert_eq!(session.route_params(), &patched_params);
session.enable_trace_capture(TraceRedactionPolicy::none());
assert!(session.trace_capture_enabled());
let _ = session.handle_client_message(ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(session.trace_artifact().is_some());
let snapshot = session.render_snapshot_patch();
assert!(matches!(
snapshot,
ServerMessage::Patch { ref html, revision, .. } if html == "<p>1</p>" && revision == session.revision()
));
let artifact = session.take_trace_artifact();
assert!(artifact.is_some());
assert!(!session.trace_capture_enabled());
session.disable_trace_capture();
assert!(session.trace_artifact().is_none());
}
#[derive(Default)]
struct MountFailureView;
impl LiveView for MountFailureView {
fn mount(&mut self, ctx: &mut Context) -> LiveResult {
ctx.redirect("/mount-failure");
ctx.subscribe("alerts");
ctx.schedule_once("retry", 10, "retry", Value::Null);
Err(ShellyError::Event("mount failed".to_string()))
}
fn render(&self) -> Html {
Html::new("<p>mount-failure</p>")
}
}
#[test]
fn mount_failure_drains_side_effects_and_keeps_session_safe() {
let mut session = LiveSession::new(Box::<MountFailureView>::default(), "root");
let err = session.mount().unwrap_err().to_string();
assert!(err.contains("mount failed"));
assert!(session.drain_pubsub_commands().is_empty());
assert!(session.drain_runtime_commands().is_empty());
assert_eq!(session.revision(), 0);
}
#[derive(Default)]
struct ComponentErrorView;
impl LiveView for ComponentErrorView {
fn handle_component_event(
&mut self,
_target: &str,
_event: Event,
_ctx: &mut Context,
) -> LiveResult<Option<ComponentRender>> {
Err(ShellyError::Event("component handler failed".to_string()))
}
fn render(&self) -> Html {
Html::new("<p>component</p>")
}
}
#[test]
fn protocol_paths_cover_navigation_upload_and_component_error_branches() {
let mut session = LiveSession::new(Box::<Counter>::default(), "root");
session.mount().unwrap();
assert_eq!(
session.handle_client_message(ClientMessage::PatchUrl {
to: "/inbox".to_string()
}),
vec![ServerMessage::PatchUrl {
to: "/inbox".to_string()
}]
);
assert_eq!(
session.handle_client_message(ClientMessage::Navigate {
to: "/settings".to_string()
}),
vec![ServerMessage::Navigate {
to: "/settings".to_string()
}]
);
for message in [
ClientMessage::UploadStart {
upload_id: "up-1".to_string(),
event: "upload".to_string(),
target: None,
name: "avatar.png".to_string(),
size: 12,
content_type: Some("image/png".to_string()),
},
ClientMessage::UploadChunk {
upload_id: "up-1".to_string(),
offset: 0,
data: "aaa".to_string(),
},
ClientMessage::UploadComplete {
upload_id: "up-1".to_string(),
},
] {
let response = session.handle_client_message(message);
assert!(matches!(
response.as_slice(),
[ServerMessage::Error { code: Some(code), .. }] if code == "unsupported_upload_transport"
));
}
let _ = session.handle_client_message(ClientMessage::Connect {
protocol: "shelly/1".to_string(),
session_id: None,
last_revision: None,
resume_token: None,
tenant_id: Some(" ".to_string()),
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
});
assert_eq!(session.tenant_id(), None);
let mut component_session = LiveSession::new(Box::<ComponentErrorView>::default(), "root");
component_session.mount().unwrap();
let component_failure = component_session.handle_client_message(ClientMessage::Event {
event: "toggle".to_string(),
target: Some("todo-1".to_string()),
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
component_failure.as_slice(),
[ServerMessage::Error { code: Some(code), .. }] if code == "event_failed"
));
}
#[derive(Default)]
struct StreamInsertDeleteBatchView;
impl LiveView for StreamInsertDeleteBatchView {
fn handle_event(&mut self, event: Event, ctx: &mut Context) -> LiveResult {
if event.name == "batch" {
ctx.stream_batch(
"items",
vec![
StreamBatchOperation::Insert {
id: "item-1".to_string(),
html: "<li>Item 1</li>".to_string(),
at: crate::StreamPosition::Append,
},
StreamBatchOperation::Delete {
id: "item-0".to_string(),
},
],
);
}
Ok(())
}
fn render(&self) -> Html {
Html::new("<ul id=\"items\"></ul>")
}
}
#[test]
fn stream_batch_delete_telemetry_and_empty_flush_paths_are_exercised() {
let telemetry = Arc::new(MemoryTelemetrySink::new());
let mut session = LiveSession::new(Box::<StreamInsertDeleteBatchView>::default(), "root");
session.set_telemetry_sink(telemetry.clone());
session.mount().unwrap();
let _ = session.render_patch();
let batch = session.handle_client_message(ClientMessage::Event {
event: "batch".to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(matches!(
batch.as_slice(),
[ServerMessage::StreamBatch { .. }]
));
let empty_flush = session.handle_client_message(ClientMessage::Event {
event: INTERNAL_RENDER_FLUSH_EVENT.to_string(),
target: None,
value: Value::Null,
metadata: Default::default(),
});
assert!(empty_flush.is_empty());
let telemetry_events = telemetry.events();
assert!(telemetry_events.iter().any(|event| {
event.kind == TelemetryEventKind::StreamInsert && event.count == Some(1)
}));
assert!(telemetry_events.iter().any(|event| {
event.kind == TelemetryEventKind::StreamDelete && event.count == Some(1)
}));
}
}