use crate::{ClientMessage, LiveSession, LiveView, ServerMessage, ShellyError};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{BTreeMap, BTreeSet};
use std::time::{SystemTime, UNIX_EPOCH};
pub const REPLAY_TRACE_FORMAT_VERSION: &str = "shelly-replay-trace/v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionReplayMetadata {
pub protocol: String,
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tenant_id: Option<String>,
pub target_id: String,
pub route_path: String,
#[serde(default)]
pub route_params: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SessionReplayTraceStep {
pub sequence: u64,
pub recorded_at_unix_ms: u64,
pub revision_before: u64,
pub revision_after: u64,
pub client_message: ClientMessage,
pub server_messages: Vec<ServerMessage>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TraceRedactionSummary {
pub redact_server_html: bool,
pub redacted_text: String,
pub keys: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SessionReplayTrace {
pub format_version: String,
pub captured_at_unix_ms: u64,
pub metadata: SessionReplayMetadata,
pub redaction: TraceRedactionSummary,
pub steps: Vec<SessionReplayTraceStep>,
}
impl SessionReplayTrace {
pub fn from_json(raw: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(raw)
}
pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TraceRedactionPolicy {
keys: BTreeSet<String>,
redacted_text: String,
redact_server_html: bool,
}
impl Default for TraceRedactionPolicy {
fn default() -> Self {
Self::developer_default()
}
}
impl TraceRedactionPolicy {
pub fn none() -> Self {
Self {
keys: BTreeSet::new(),
redacted_text: "<redacted>".to_string(),
redact_server_html: false,
}
}
pub fn developer_default() -> Self {
Self {
keys: default_sensitive_keys()
.into_iter()
.map(normalize_key)
.collect(),
redacted_text: "<redacted>".to_string(),
redact_server_html: false,
}
}
pub fn production_safe() -> Self {
let mut keys = default_sensitive_keys()
.into_iter()
.map(normalize_key)
.collect::<BTreeSet<_>>();
for extra in ["email", "phone", "ssn", "data", "html"] {
keys.insert(normalize_key(extra));
}
Self {
keys,
redacted_text: "<redacted>".to_string(),
redact_server_html: true,
}
}
pub fn with_key(mut self, key: impl Into<String>) -> Self {
self.keys.insert(normalize_key(key.into()));
self
}
pub fn with_redacted_text(mut self, value: impl Into<String>) -> Self {
self.redacted_text = value.into();
self
}
pub fn with_redact_server_html(mut self, enabled: bool) -> Self {
self.redact_server_html = enabled;
self
}
fn summary(&self) -> TraceRedactionSummary {
TraceRedactionSummary {
redact_server_html: self.redact_server_html,
redacted_text: self.redacted_text.clone(),
keys: self.keys.iter().cloned().collect(),
}
}
fn from_summary(summary: &TraceRedactionSummary) -> Self {
Self {
keys: summary
.keys
.iter()
.map(normalize_key)
.collect::<BTreeSet<_>>(),
redacted_text: summary.redacted_text.clone(),
redact_server_html: summary.redact_server_html,
}
}
fn should_redact(&self, key: &str) -> bool {
self.keys.contains(&normalize_key(key))
}
fn redact_option_string(&self, key: &str, value: &mut Option<String>) {
if self.should_redact(key) && value.is_some() {
*value = Some(self.redacted_text.clone());
}
}
fn redact_string(&self, key: &str, value: &mut String) {
if self.should_redact(key) {
*value = self.redacted_text.clone();
}
}
fn redact_json_value(&self, key: Option<&str>, value: &mut Value) {
if let Some(current_key) = key {
if self.should_redact(current_key) {
*value = Value::String(self.redacted_text.clone());
return;
}
}
match value {
Value::Object(map) => {
for (nested_key, nested_value) in map {
self.redact_json_value(Some(nested_key.as_str()), nested_value);
}
}
Value::Array(items) => {
for item in items {
self.redact_json_value(None, item);
}
}
_ => {}
}
}
fn redact_client_message(&self, mut message: ClientMessage) -> ClientMessage {
match &mut message {
ClientMessage::Connect {
resume_token,
trace_id,
span_id,
parent_span_id,
correlation_id,
request_id,
..
} => {
self.redact_option_string("resume_token", resume_token);
self.redact_option_string("trace_id", trace_id);
self.redact_option_string("span_id", span_id);
self.redact_option_string("parent_span_id", parent_span_id);
self.redact_option_string("correlation_id", correlation_id);
self.redact_option_string("request_id", request_id);
}
ClientMessage::Event {
value, metadata, ..
} => {
self.redact_json_value(None, value);
for (key, value) in metadata {
self.redact_json_value(Some(key.as_str()), value);
}
}
ClientMessage::UploadStart {
upload_id,
name,
content_type,
..
} => {
self.redact_string("upload_id", upload_id);
self.redact_string("name", name);
self.redact_option_string("content_type", content_type);
}
ClientMessage::UploadChunk {
upload_id, data, ..
} => {
self.redact_string("upload_id", upload_id);
self.redact_string("data", data);
}
ClientMessage::UploadComplete { upload_id } => {
self.redact_string("upload_id", upload_id);
}
ClientMessage::Ping { .. }
| ClientMessage::PatchUrl { .. }
| ClientMessage::Navigate { .. } => {}
}
message
}
fn redact_server_message(&self, mut message: ServerMessage) -> ServerMessage {
match &mut message {
ServerMessage::Hello {
resume_token,
session_id,
..
} => {
self.redact_option_string("resume_token", resume_token);
self.redact_string("session_id", session_id);
}
ServerMessage::Patch { html, .. } => {
if self.redact_server_html || self.should_redact("html") {
*html = self.redacted_text.clone();
}
}
ServerMessage::Diff { slots, .. } => {
if self.redact_server_html || self.should_redact("html") {
for slot in slots {
slot.html = self.redacted_text.clone();
}
}
}
ServerMessage::StreamInsert { html, .. } => {
if self.redact_server_html || self.should_redact("html") {
*html = self.redacted_text.clone();
}
}
ServerMessage::UploadComplete {
upload_id,
name,
content_type,
..
} => {
self.redact_string("upload_id", upload_id);
self.redact_string("name", name);
self.redact_option_string("content_type", content_type);
}
ServerMessage::UploadError {
upload_id, message, ..
} => {
self.redact_string("upload_id", upload_id);
self.redact_string("message", message);
}
ServerMessage::Error { message, .. } => {
self.redact_string("message", message);
}
ServerMessage::Pong { .. }
| ServerMessage::Redirect { .. }
| ServerMessage::PatchUrl { .. }
| ServerMessage::Navigate { .. }
| ServerMessage::StreamDelete { .. }
| ServerMessage::StreamBatch { .. }
| ServerMessage::ChartSeriesAppend { .. }
| ServerMessage::ChartSeriesAppendMany { .. }
| ServerMessage::ChartSeriesReplace { .. }
| ServerMessage::ChartReset { .. }
| ServerMessage::ChartAnnotationUpsert { .. }
| ServerMessage::ChartAnnotationDelete { .. }
| ServerMessage::ToastPush { .. }
| ServerMessage::ToastDismiss { .. }
| ServerMessage::InboxUpsert { .. }
| ServerMessage::InboxDelete { .. }
| ServerMessage::GridReplace { .. }
| ServerMessage::GridRowsReplace { .. }
| ServerMessage::InteropDispatch { .. }
| ServerMessage::UploadProgress { .. } => {}
}
message
}
}
#[derive(Debug, Clone)]
pub struct SessionTraceRecorder {
policy: TraceRedactionPolicy,
artifact: SessionReplayTrace,
next_sequence: u64,
}
impl SessionTraceRecorder {
pub fn new(metadata: SessionReplayMetadata, policy: TraceRedactionPolicy) -> Self {
Self {
policy: policy.clone(),
artifact: SessionReplayTrace {
format_version: REPLAY_TRACE_FORMAT_VERSION.to_string(),
captured_at_unix_ms: now_unix_ms(),
metadata,
redaction: policy.summary(),
steps: Vec::new(),
},
next_sequence: 1,
}
}
pub fn record_turn(
&mut self,
client_message: &ClientMessage,
server_messages: &[ServerMessage],
revision_before: u64,
revision_after: u64,
) {
let client = self.policy.redact_client_message(client_message.clone());
let server = server_messages
.iter()
.cloned()
.map(|message| self.policy.redact_server_message(message))
.collect::<Vec<_>>();
self.artifact.steps.push(SessionReplayTraceStep {
sequence: self.next_sequence,
recorded_at_unix_ms: now_unix_ms(),
revision_before,
revision_after,
client_message: client,
server_messages: server,
});
self.next_sequence += 1;
}
pub fn artifact(&self) -> SessionReplayTrace {
self.artifact.clone()
}
pub fn into_artifact(self) -> SessionReplayTrace {
self.artifact
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReplayStepStatus {
Match,
Mismatch,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ReplayStepResult {
pub sequence: u64,
pub status: ReplayStepStatus,
pub mismatch_reason: Option<String>,
pub expected_revision_before: u64,
pub expected_revision_after: u64,
pub actual_revision_before: u64,
pub actual_revision_after: u64,
pub client_message: ClientMessage,
pub expected_server_messages: Vec<ServerMessage>,
pub actual_server_messages: Vec<ServerMessage>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ReplayReport {
pub format_version: String,
pub metadata: SessionReplayMetadata,
pub total_steps: usize,
pub matched_steps: usize,
pub first_mismatch_sequence: Option<u64>,
pub revision_monotonic: bool,
pub final_revision: u64,
pub steps: Vec<ReplayStepResult>,
}
impl ReplayReport {
pub fn passed(&self) -> bool {
self.first_mismatch_sequence.is_none()
&& self.revision_monotonic
&& self.matched_steps == self.total_steps
}
}
pub fn replay_trace<F>(
trace: &SessionReplayTrace,
mut view_factory: F,
) -> Result<ReplayReport, ShellyError>
where
F: FnMut() -> Box<dyn LiveView>,
{
let comparison_policy = TraceRedactionPolicy::from_summary(&trace.redaction);
let metadata = &trace.metadata;
let mut session = LiveSession::new_with_route_and_session_id(
view_factory(),
metadata.session_id.clone(),
metadata.target_id.clone(),
metadata.route_path.clone(),
metadata.route_params.clone(),
);
session.mount()?;
let mut steps = Vec::with_capacity(trace.steps.len());
let mut matched_steps = 0usize;
let mut first_mismatch_sequence = None;
let mut revision_monotonic = true;
let mut previous_server_revision = 0u64;
for expected_step in &trace.steps {
let actual_revision_before = session.revision();
let actual_messages_raw =
session.handle_client_message(expected_step.client_message.clone());
let actual_revision_after = session.revision();
let actual_messages = actual_messages_raw
.iter()
.cloned()
.map(|message| comparison_policy.redact_server_message(message))
.collect::<Vec<_>>();
let mut mismatch_reasons = Vec::new();
if expected_step.revision_before != actual_revision_before {
mismatch_reasons.push(format!(
"revision_before mismatch: expected {}, got {}",
expected_step.revision_before, actual_revision_before
));
}
if expected_step.revision_after != actual_revision_after {
mismatch_reasons.push(format!(
"revision_after mismatch: expected {}, got {}",
expected_step.revision_after, actual_revision_after
));
}
if expected_step.server_messages != actual_messages {
mismatch_reasons.push("server_messages mismatch".to_string());
}
if actual_revision_after < actual_revision_before {
revision_monotonic = false;
mismatch_reasons.push("session revision regressed".to_string());
}
match validate_server_revisions(previous_server_revision, &actual_messages_raw) {
Ok(next_revision) => {
previous_server_revision = next_revision;
}
Err(reason) => {
revision_monotonic = false;
mismatch_reasons.push(reason);
}
}
let status = if mismatch_reasons.is_empty() {
matched_steps += 1;
ReplayStepStatus::Match
} else {
if first_mismatch_sequence.is_none() {
first_mismatch_sequence = Some(expected_step.sequence);
}
ReplayStepStatus::Mismatch
};
let mismatch_reason = if mismatch_reasons.is_empty() {
None
} else {
Some(mismatch_reasons.join("; "))
};
steps.push(ReplayStepResult {
sequence: expected_step.sequence,
status,
mismatch_reason,
expected_revision_before: expected_step.revision_before,
expected_revision_after: expected_step.revision_after,
actual_revision_before,
actual_revision_after,
client_message: expected_step.client_message.clone(),
expected_server_messages: expected_step.server_messages.clone(),
actual_server_messages: actual_messages,
});
}
Ok(ReplayReport {
format_version: trace.format_version.clone(),
metadata: trace.metadata.clone(),
total_steps: trace.steps.len(),
matched_steps,
first_mismatch_sequence,
revision_monotonic,
final_revision: session.revision(),
steps,
})
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TimeTravelFrame {
pub sequence: u64,
pub status: ReplayStepStatus,
pub client_kind: String,
pub client_summary: String,
pub expected_revision_after: u64,
pub actual_revision_after: u64,
pub expected_server_count: usize,
pub actual_server_count: usize,
pub mismatch_reason: Option<String>,
pub client_message: ClientMessage,
pub expected_server_messages: Vec<ServerMessage>,
pub actual_server_messages: Vec<ServerMessage>,
}
#[derive(Debug, Clone, Default)]
pub struct TimeTravelInspector {
frames: Vec<TimeTravelFrame>,
cursor: usize,
}
impl TimeTravelInspector {
pub fn from_report(report: &ReplayReport) -> Self {
let frames = report
.steps
.iter()
.map(|step| TimeTravelFrame {
sequence: step.sequence,
status: step.status.clone(),
client_kind: client_message_kind(&step.client_message).to_string(),
client_summary: client_message_summary(&step.client_message),
expected_revision_after: step.expected_revision_after,
actual_revision_after: step.actual_revision_after,
expected_server_count: step.expected_server_messages.len(),
actual_server_count: step.actual_server_messages.len(),
mismatch_reason: step.mismatch_reason.clone(),
client_message: step.client_message.clone(),
expected_server_messages: step.expected_server_messages.clone(),
actual_server_messages: step.actual_server_messages.clone(),
})
.collect::<Vec<_>>();
Self { frames, cursor: 0 }
}
pub fn len(&self) -> usize {
self.frames.len()
}
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
pub fn cursor(&self) -> usize {
self.cursor
}
pub fn frames(&self) -> &[TimeTravelFrame] {
&self.frames
}
pub fn current(&self) -> Option<&TimeTravelFrame> {
self.frames.get(self.cursor)
}
pub fn step_to(&mut self, index: usize) -> Option<&TimeTravelFrame> {
if index < self.frames.len() {
self.cursor = index;
self.frames.get(self.cursor)
} else {
None
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<&TimeTravelFrame> {
if self.cursor + 1 < self.frames.len() {
self.cursor += 1;
}
self.frames.get(self.cursor)
}
pub fn previous(&mut self) -> Option<&TimeTravelFrame> {
if self.cursor > 0 {
self.cursor -= 1;
}
self.frames.get(self.cursor)
}
}
fn client_message_kind(message: &ClientMessage) -> &'static str {
match message {
ClientMessage::Connect { .. } => "connect",
ClientMessage::Event { .. } => "event",
ClientMessage::Ping { .. } => "ping",
ClientMessage::PatchUrl { .. } => "patch_url",
ClientMessage::Navigate { .. } => "navigate",
ClientMessage::UploadStart { .. } => "upload_start",
ClientMessage::UploadChunk { .. } => "upload_chunk",
ClientMessage::UploadComplete { .. } => "upload_complete",
}
}
fn client_message_summary(message: &ClientMessage) -> String {
match message {
ClientMessage::Event { event, target, .. } => {
if let Some(target) = target {
format!("{event} -> {target}")
} else {
event.clone()
}
}
ClientMessage::PatchUrl { to } => format!("patch_url {to}"),
ClientMessage::Navigate { to } => format!("navigate {to}"),
ClientMessage::Ping { .. } => "ping".to_string(),
ClientMessage::Connect { .. } => "connect".to_string(),
ClientMessage::UploadStart { upload_id, .. } => format!("upload_start {upload_id}"),
ClientMessage::UploadChunk { upload_id, .. } => format!("upload_chunk {upload_id}"),
ClientMessage::UploadComplete { upload_id } => format!("upload_complete {upload_id}"),
}
}
fn validate_server_revisions(
mut previous_revision: u64,
messages: &[ServerMessage],
) -> Result<u64, String> {
for message in messages {
let current_revision = match message {
ServerMessage::Patch { revision, .. } => Some(*revision),
ServerMessage::Diff { revision, .. } => Some(*revision),
_ => None,
};
if let Some(current_revision) = current_revision {
if current_revision <= previous_revision {
return Err(format!(
"non-monotonic server revision: previous={}, next={}",
previous_revision, current_revision
));
}
previous_revision = current_revision;
}
}
Ok(previous_revision)
}
fn normalize_key(input: impl AsRef<str>) -> String {
input.as_ref().trim().to_lowercase().replace('-', "_")
}
fn default_sensitive_keys() -> Vec<&'static str> {
vec![
"password",
"passphrase",
"secret",
"token",
"resume_token",
"csrf",
"authorization",
"cookie",
"api_key",
"access_token",
"refresh_token",
"id_token",
]
}
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Context, Event, Html, LiveResult};
use serde_json::json;
#[derive(Default)]
struct CounterView {
count: i64,
}
impl LiveView for CounterView {
fn mount(&mut self, _ctx: &mut Context) -> LiveResult {
self.count = 0;
Ok(())
}
fn handle_event(&mut self, event: Event, _ctx: &mut Context) -> LiveResult {
match event.name.as_str() {
"inc" => self.count += 1,
"dec" => self.count -= 1,
_ => {}
}
Ok(())
}
fn render(&self) -> Html {
Html::new(format!("<p>Count: {}</p>", self.count))
}
}
fn build_trace(policy: TraceRedactionPolicy) -> SessionReplayTrace {
let mut session = LiveSession::new(Box::<CounterView>::default(), "root");
session.mount().expect("mount trace session");
session.enable_trace_capture(policy);
let first = ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: json!({
"password": "super-secret",
"nested": {"token": "abc"},
"n": 1
}),
metadata: serde_json::Map::from_iter([(
"authorization".to_string(),
Value::String("Bearer 123".to_string()),
)]),
};
let second = ClientMessage::Event {
event: "inc".to_string(),
target: None,
value: json!({}),
metadata: serde_json::Map::new(),
};
let _ = session.handle_client_message(first);
let _ = session.handle_client_message(second);
session.take_trace_artifact().expect("trace artifact")
}
#[test]
fn replay_trace_reproduces_session_without_live_dependencies() {
let trace = build_trace(TraceRedactionPolicy::developer_default());
let report = replay_trace(&trace, || Box::<CounterView>::default()).expect("replay report");
assert!(report.passed(), "replay mismatches: {:#?}", report);
assert_eq!(report.total_steps, 2);
assert_eq!(report.final_revision, 2);
}
#[test]
fn replay_trace_round_trips_json_and_empty_reports() {
let metadata = SessionReplayMetadata {
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
session_id: "sid-1".to_string(),
tenant_id: Some("tenant-a".to_string()),
target_id: "root".to_string(),
route_path: "/".to_string(),
route_params: BTreeMap::new(),
};
let recorder = SessionTraceRecorder::new(metadata, TraceRedactionPolicy::none());
let artifact = recorder.into_artifact();
assert_eq!(artifact.steps.len(), 0);
let json = artifact.to_json_pretty().expect("serialize trace");
let decoded = SessionReplayTrace::from_json(&json).expect("deserialize trace");
assert_eq!(decoded, artifact);
let report = replay_trace(&decoded, || Box::<CounterView>::default()).expect("replay");
assert!(report.passed());
assert_eq!(report.total_steps, 0);
assert_eq!(report.matched_steps, 0);
let mut inspector = TimeTravelInspector::from_report(&report);
assert!(inspector.is_empty());
assert_eq!(inspector.len(), 0);
assert!(inspector.current().is_none());
assert!(inspector.next().is_none());
assert!(inspector.previous().is_none());
assert!(inspector.step_to(0).is_none());
assert_eq!(inspector.cursor(), 0);
assert!(inspector.frames().is_empty());
}
#[test]
fn replay_trace_detects_mismatch_and_revision_issues() {
let mut trace = build_trace(TraceRedactionPolicy::developer_default());
trace.steps[1].revision_after = 99;
let report = replay_trace(&trace, || Box::<CounterView>::default()).expect("replay report");
assert!(!report.passed());
assert_eq!(report.first_mismatch_sequence, Some(2));
}
#[test]
fn validate_server_revisions_rejects_regressions() {
let err = validate_server_revisions(
3,
&[ServerMessage::Patch {
target: "root".to_string(),
html: "<p>regress</p>".to_string(),
revision: 2,
}],
)
.expect_err("revision regression should fail");
assert!(err.contains("non-monotonic server revision"));
let next = validate_server_revisions(
2,
&[ServerMessage::Diff {
target: "root".to_string(),
revision: 3,
slots: vec![],
}],
)
.expect("revision should advance");
assert_eq!(next, 3);
}
#[test]
fn redaction_policy_covers_connect_upload_and_server_variants() {
let metadata = SessionReplayMetadata {
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
session_id: "sid-1".to_string(),
tenant_id: None,
target_id: "root".to_string(),
route_path: "/".to_string(),
route_params: BTreeMap::new(),
};
let policy = TraceRedactionPolicy::none()
.with_key("resume-token")
.with_key("trace_id")
.with_key("span_id")
.with_key("parent_span_id")
.with_key("correlation_id")
.with_key("request_id")
.with_key("upload_id")
.with_key("name")
.with_key("content_type")
.with_key("message")
.with_key("session_id")
.with_key("data")
.with_redacted_text("<mask>")
.with_redact_server_html(true);
let mut recorder = SessionTraceRecorder::new(metadata, policy);
let client = ClientMessage::Connect {
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
session_id: Some("sid-1".to_string()),
last_revision: Some(5),
resume_token: Some("resume".to_string()),
tenant_id: Some("tenant-1".to_string()),
trace_id: Some("4bf92f3577b34da6a3ce929d0e0e4736".to_string()),
span_id: Some("00f067aa0ba902b7".to_string()),
parent_span_id: Some("89abcdef01234567".to_string()),
correlation_id: Some("corr-1".to_string()),
request_id: Some("req-1".to_string()),
};
let server = vec![
ServerMessage::Hello {
session_id: "sid-1".to_string(),
target: "root".to_string(),
revision: 0,
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
server_revision: Some(0),
resume_status: Some(crate::ResumeStatus::Fresh),
resume_reason: None,
resume_token: Some("resume".to_string()),
resume_expires_in_ms: Some(120_000),
},
ServerMessage::Patch {
target: "root".to_string(),
html: "<p>private</p>".to_string(),
revision: 1,
},
ServerMessage::Diff {
target: "root".to_string(),
revision: 2,
slots: vec![crate::DynamicSlotPatch {
index: 0,
html: "<span>hidden</span>".to_string(),
}],
},
ServerMessage::StreamInsert {
target: "items".to_string(),
id: "item-1".to_string(),
html: "<li>secret</li>".to_string(),
at: crate::StreamPosition::Append,
},
ServerMessage::UploadComplete {
upload_id: "u-1".to_string(),
name: "avatar.png".to_string(),
size: 10,
content_type: Some("image/png".to_string()),
},
ServerMessage::UploadError {
upload_id: "u-1".to_string(),
message: "bad chunk".to_string(),
code: Some("upload_invalid_chunk".to_string()),
},
ServerMessage::Error {
message: "panic".to_string(),
code: Some("server_error".to_string()),
},
];
recorder.record_turn(&client, &server, 0, 1);
let artifact = recorder.artifact();
assert_eq!(artifact.redaction.redacted_text, "<mask>");
assert!(artifact.redaction.redact_server_html);
assert_eq!(artifact.steps.len(), 1);
let step = &artifact.steps[0];
match &step.client_message {
ClientMessage::Connect {
resume_token,
trace_id,
span_id,
parent_span_id,
correlation_id,
request_id,
..
} => {
assert_eq!(resume_token.as_deref(), Some("<mask>"));
assert_eq!(trace_id.as_deref(), Some("<mask>"));
assert_eq!(span_id.as_deref(), Some("<mask>"));
assert_eq!(parent_span_id.as_deref(), Some("<mask>"));
assert_eq!(correlation_id.as_deref(), Some("<mask>"));
assert_eq!(request_id.as_deref(), Some("<mask>"));
}
other => panic!("expected connect, got {other:?}"),
}
match &step.server_messages[0] {
ServerMessage::Hello {
session_id,
resume_token,
..
} => {
assert_eq!(session_id, "<mask>");
assert_eq!(resume_token.as_deref(), Some("<mask>"));
}
other => panic!("expected hello, got {other:?}"),
}
match &step.server_messages[1] {
ServerMessage::Patch { html, .. } => assert_eq!(html, "<mask>"),
other => panic!("expected patch, got {other:?}"),
}
match &step.server_messages[2] {
ServerMessage::Diff { slots, .. } => assert_eq!(slots[0].html, "<mask>"),
other => panic!("expected diff, got {other:?}"),
}
match &step.server_messages[3] {
ServerMessage::StreamInsert { html, .. } => assert_eq!(html, "<mask>"),
other => panic!("expected stream insert, got {other:?}"),
}
match &step.server_messages[4] {
ServerMessage::UploadComplete {
upload_id,
name,
content_type,
..
} => {
assert_eq!(upload_id, "<mask>");
assert_eq!(name, "<mask>");
assert_eq!(content_type.as_deref(), Some("<mask>"));
}
other => panic!("expected upload complete, got {other:?}"),
}
match &step.server_messages[5] {
ServerMessage::UploadError {
upload_id, message, ..
} => {
assert_eq!(upload_id, "<mask>");
assert_eq!(message, "<mask>");
}
other => panic!("expected upload error, got {other:?}"),
}
match &step.server_messages[6] {
ServerMessage::Error { message, .. } => assert_eq!(message, "<mask>"),
other => panic!("expected error, got {other:?}"),
}
}
#[test]
fn production_redaction_masks_sensitive_payloads() {
let trace = build_trace(TraceRedactionPolicy::production_safe());
let step = &trace.steps[0];
match &step.client_message {
ClientMessage::Event {
value, metadata, ..
} => {
assert_eq!(value["password"], "<redacted>");
assert_eq!(value["nested"]["token"], "<redacted>");
assert_eq!(metadata["authorization"], "<redacted>");
}
_ => panic!("expected event"),
}
match &step.server_messages[0] {
ServerMessage::Patch { html, .. } => assert_eq!(html, "<redacted>"),
_ => panic!("expected patch"),
}
}
#[test]
fn inspector_supports_time_travel_navigation() {
let trace = build_trace(TraceRedactionPolicy::developer_default());
let report = replay_trace(&trace, || Box::<CounterView>::default()).expect("replay report");
let mut inspector = TimeTravelInspector::from_report(&report);
assert_eq!(inspector.len(), 2);
assert_eq!(inspector.current().expect("current frame").sequence, 1);
inspector.next();
assert_eq!(inspector.current().expect("current frame").sequence, 2);
inspector.previous();
assert_eq!(inspector.current().expect("current frame").sequence, 1);
}
#[test]
fn inspector_frames_cover_client_kind_and_summary_variants() {
let connect = ClientMessage::Connect {
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
session_id: None,
last_revision: None,
resume_token: None,
tenant_id: None,
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
};
let steps = vec![
ReplayStepResult {
sequence: 1,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 0,
expected_revision_after: 0,
actual_revision_before: 0,
actual_revision_after: 0,
client_message: connect,
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 2,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 0,
expected_revision_after: 1,
actual_revision_before: 0,
actual_revision_after: 1,
client_message: ClientMessage::Event {
event: "save".to_string(),
target: Some("form-1".to_string()),
value: json!({}),
metadata: serde_json::Map::new(),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 3,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::PatchUrl {
to: "/users".to_string(),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 4,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::Navigate {
to: "/users/1".to_string(),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 5,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::Ping {
nonce: Some("n1".to_string()),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 6,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::UploadStart {
upload_id: "u1".to_string(),
event: "uploaded".to_string(),
target: None,
name: "a.txt".to_string(),
size: 1,
content_type: None,
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 7,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::UploadChunk {
upload_id: "u1".to_string(),
offset: 0,
data: "AA==".to_string(),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
ReplayStepResult {
sequence: 8,
status: ReplayStepStatus::Match,
mismatch_reason: None,
expected_revision_before: 1,
expected_revision_after: 1,
actual_revision_before: 1,
actual_revision_after: 1,
client_message: ClientMessage::UploadComplete {
upload_id: "u1".to_string(),
},
expected_server_messages: vec![],
actual_server_messages: vec![],
},
];
let report = ReplayReport {
format_version: REPLAY_TRACE_FORMAT_VERSION.to_string(),
metadata: SessionReplayMetadata {
protocol: crate::PROTOCOL_VERSION_V1.to_string(),
session_id: "sid".to_string(),
tenant_id: None,
target_id: "root".to_string(),
route_path: "/".to_string(),
route_params: BTreeMap::new(),
},
total_steps: steps.len(),
matched_steps: steps.len(),
first_mismatch_sequence: None,
revision_monotonic: true,
final_revision: 1,
steps,
};
let inspector = TimeTravelInspector::from_report(&report);
assert_eq!(inspector.frames()[0].client_kind, "connect");
assert_eq!(inspector.frames()[1].client_summary, "save -> form-1");
assert_eq!(inspector.frames()[2].client_summary, "patch_url /users");
assert_eq!(inspector.frames()[3].client_summary, "navigate /users/1");
assert_eq!(inspector.frames()[4].client_summary, "ping");
assert_eq!(inspector.frames()[5].client_summary, "upload_start u1");
assert_eq!(inspector.frames()[6].client_summary, "upload_chunk u1");
assert_eq!(inspector.frames()[7].client_summary, "upload_complete u1");
}
}