1use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{borrow::Cow, ops::Range, path::Path};
13
14use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
15use crate::model::types::{Conversation, Message, MessageRole, Snippet};
16
17pub const CONVERSATION_PACKET_VERSION: u32 = 1;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum ConversationPacketBuilder {
21 RawConnectorScan,
22 CanonicalReplay,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub enum ConversationPacketVersionStatus {
27 Current,
28 Mismatch { expected: u32, observed: u32 },
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub struct ConversationPacketDiagnostics {
33 pub builder: ConversationPacketBuilder,
34 pub contract_version: u32,
35 pub version_status: ConversationPacketVersionStatus,
36 pub warnings: Vec<String>,
37}
38
39impl ConversationPacketDiagnostics {
40 pub fn current(builder: ConversationPacketBuilder) -> Self {
41 Self {
42 builder,
43 contract_version: CONVERSATION_PACKET_VERSION,
44 version_status: ConversationPacketVersionStatus::Current,
45 warnings: Vec::new(),
46 }
47 }
48
49 pub fn version_mismatch(builder: ConversationPacketBuilder, observed: u32) -> Self {
50 Self {
51 builder,
52 contract_version: CONVERSATION_PACKET_VERSION,
53 version_status: ConversationPacketVersionStatus::Mismatch {
54 expected: CONVERSATION_PACKET_VERSION,
55 observed,
56 },
57 warnings: vec![format!(
58 "conversation packet version mismatch: expected {}, observed {}",
59 CONVERSATION_PACKET_VERSION, observed
60 )],
61 }
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ConversationPacketProvenance {
67 pub source_id: String,
68 pub origin_kind: String,
69 pub origin_host: Option<String>,
70}
71
72impl ConversationPacketProvenance {
73 pub fn local() -> Self {
74 Self {
75 source_id: "local".to_string(),
76 origin_kind: "local".to_string(),
77 origin_host: None,
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct ConversationPacketIdentity {
84 pub conversation_id: Option<i64>,
85 pub agent_slug: String,
86 pub external_id: Option<String>,
87 pub workspace: Option<String>,
88 pub source_path: String,
89 pub title: Option<String>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct ConversationPacketTimestamps {
94 pub started_at: Option<i64>,
95 pub ended_at: Option<i64>,
96 pub first_message_at: Option<i64>,
97 pub last_message_at: Option<i64>,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101pub struct ConversationPacketSnippet {
102 pub file_path: Option<String>,
103 pub start_line: Option<i64>,
104 pub end_line: Option<i64>,
105 pub language: Option<String>,
106 pub snippet_text: Option<String>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct ConversationPacketMessage {
111 pub message_id: Option<i64>,
112 pub idx: i64,
113 pub role: String,
114 pub author: Option<String>,
115 pub created_at: Option<i64>,
116 pub content: String,
117 pub extra_json: Value,
118 pub snippets: Vec<ConversationPacketSnippet>,
119}
120
121#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
122pub struct ConversationPacketPayload {
123 pub identity: ConversationPacketIdentity,
124 pub provenance: ConversationPacketProvenance,
125 pub timestamps: ConversationPacketTimestamps,
126 pub metadata_json: Value,
127 pub messages: Vec<ConversationPacketMessage>,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub struct ConversationPacketHashes {
132 pub semantic_hash: String,
137 pub message_hash: String,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct ConversationPacketLexicalProjection {
143 pub message_indices: Vec<usize>,
144 pub total_content_bytes: usize,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
148pub struct ConversationPacketSemanticProjection {
149 pub message_indices: Vec<usize>,
150 pub total_content_bytes: usize,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct ConversationPacketAnalyticsProjection {
155 pub user_messages: usize,
156 pub assistant_messages: usize,
157 pub tool_messages: usize,
158 pub system_messages: usize,
159 pub other_messages: usize,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163pub struct ConversationPacketSinkProjections {
164 pub lexical: ConversationPacketLexicalProjection,
165 pub semantic: ConversationPacketSemanticProjection,
166 pub analytics: ConversationPacketAnalyticsProjection,
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
170pub struct ConversationPacket {
171 pub version: u32,
172 pub diagnostics: ConversationPacketDiagnostics,
173 pub payload: ConversationPacketPayload,
174 pub hashes: ConversationPacketHashes,
175 pub projections: ConversationPacketSinkProjections,
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
179pub enum ConversationPacketTextSink {
180 Lexical,
181 Semantic,
182 Fingerprint,
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub enum ConversationPacketTextBatchMode {
187 Slab,
188 OwnedFallback,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct ConversationPacketProjectionError {
193 pub sink: ConversationPacketTextSink,
194 pub message_index: usize,
195 pub message_count: usize,
196}
197
198impl std::fmt::Display for ConversationPacketProjectionError {
199 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 write!(
201 formatter,
202 "{:?} packet projection references message index {} but packet has {} messages",
203 self.sink, self.message_index, self.message_count
204 )
205 }
206}
207
208impl std::error::Error for ConversationPacketProjectionError {}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub struct ConversationPacketTextMessage<'a> {
212 pub message_index: usize,
213 pub message_id: Option<i64>,
214 pub idx: i64,
215 pub role: Cow<'a, str>,
216 pub author: Option<Cow<'a, str>>,
217 pub created_at: Option<i64>,
218 pub content: Cow<'a, str>,
219}
220
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct ConversationPacketTextBatch<'a> {
223 pub sink: ConversationPacketTextSink,
224 pub mode: ConversationPacketTextBatchMode,
225 pub total_content_bytes: usize,
226 messages: Vec<ConversationPacketTextMessage<'a>>,
227}
228
229impl<'a> ConversationPacketTextBatch<'a> {
230 pub fn messages(&self) -> &[ConversationPacketTextMessage<'a>] {
231 &self.messages
232 }
233
234 pub fn len(&self) -> usize {
235 self.messages.len()
236 }
237
238 pub fn is_empty(&self) -> bool {
239 self.messages.is_empty()
240 }
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct ConversationPacketTextSlab {
245 text: String,
246 message_ranges: Vec<Range<usize>>,
247}
248
249impl ConversationPacketTextSlab {
250 pub fn from_packet(packet: &ConversationPacket) -> Self {
251 let mut text = String::with_capacity(packet_total_content_bytes(&packet.payload.messages));
252 let mut message_ranges = Vec::with_capacity(packet.payload.messages.len());
253 for message in &packet.payload.messages {
254 let start = text.len();
255 text.push_str(&message.content);
256 let end = text.len();
257 message_ranges.push(start..end);
258 }
259 Self {
260 text,
261 message_ranges,
262 }
263 }
264
265 pub fn text(&self) -> &str {
266 &self.text
267 }
268
269 pub fn message_count(&self) -> usize {
270 self.message_ranges.len()
271 }
272
273 pub fn is_empty(&self) -> bool {
274 self.text.is_empty()
275 }
276
277 pub fn message_content(&self, message_index: usize) -> Option<&str> {
278 self.message_ranges
279 .get(message_index)
280 .and_then(|range| self.text.get(range.clone()))
281 }
282
283 pub fn message_range(&self, message_index: usize) -> Option<Range<usize>> {
284 self.message_ranges.get(message_index).cloned()
285 }
286
287 pub fn sink_batch<'a>(
288 &'a self,
289 packet: &'a ConversationPacket,
290 sink: ConversationPacketTextSink,
291 ) -> Result<ConversationPacketTextBatch<'a>, ConversationPacketProjectionError> {
292 let indices = packet.sink_message_indices(sink);
293 let mut messages = Vec::with_capacity(indices.len());
294 for &message_index in indices.iter() {
295 let Some(message) = packet.payload.messages.get(message_index) else {
296 return Err(ConversationPacketProjectionError {
297 sink,
298 message_index,
299 message_count: packet.payload.messages.len(),
300 });
301 };
302 let Some(content) = self.message_content(message_index) else {
303 return Err(ConversationPacketProjectionError {
304 sink,
305 message_index,
306 message_count: self.message_count(),
307 });
308 };
309 messages.push(ConversationPacketTextMessage {
310 message_index,
311 message_id: message.message_id,
312 idx: message.idx,
313 role: Cow::Borrowed(message.role.as_str()),
314 author: message.author.as_deref().map(Cow::Borrowed),
315 created_at: message.created_at,
316 content: Cow::Borrowed(content),
317 });
318 }
319 Ok(ConversationPacketTextBatch {
320 sink,
321 mode: ConversationPacketTextBatchMode::Slab,
322 total_content_bytes: packet.sink_total_content_bytes(sink),
323 messages,
324 })
325 }
326}
327
328impl ConversationPacket {
329 pub fn from_normalized_conversation(
330 conversation: &NormalizedConversation,
331 provenance: ConversationPacketProvenance,
332 ) -> Self {
333 let messages = conversation
334 .messages
335 .iter()
336 .map(packet_message_from_normalized)
337 .collect::<Vec<_>>();
338 let payload = ConversationPacketPayload {
339 identity: ConversationPacketIdentity {
340 conversation_id: None,
341 agent_slug: conversation.agent_slug.clone(),
342 external_id: conversation.external_id.clone(),
343 workspace: conversation.workspace.as_deref().map(path_to_packet_string),
344 source_path: path_to_packet_string(&conversation.source_path),
345 title: conversation.title.clone(),
346 },
347 provenance,
348 timestamps: timestamps_from_parts(
349 conversation.started_at,
350 conversation.ended_at,
351 &messages,
352 ),
353 metadata_json: conversation.metadata.clone(),
354 messages,
355 };
356 Self::from_payload(payload, ConversationPacketBuilder::RawConnectorScan)
357 }
358
359 pub fn from_canonical_replay(
360 conversation: &Conversation,
361 provenance: ConversationPacketProvenance,
362 ) -> Self {
363 let messages = conversation
364 .messages
365 .iter()
366 .map(packet_message_from_canonical)
367 .collect::<Vec<_>>();
368 let payload = ConversationPacketPayload {
369 identity: ConversationPacketIdentity {
370 conversation_id: conversation.id,
371 agent_slug: conversation.agent_slug.clone(),
372 external_id: conversation.external_id.clone(),
373 workspace: conversation.workspace.as_deref().map(path_to_packet_string),
374 source_path: path_to_packet_string(&conversation.source_path),
375 title: conversation.title.clone(),
376 },
377 provenance,
378 timestamps: timestamps_from_parts(
379 conversation.started_at,
380 conversation.ended_at,
381 &messages,
382 ),
383 metadata_json: conversation.metadata_json.clone(),
384 messages,
385 };
386 Self::from_payload(payload, ConversationPacketBuilder::CanonicalReplay)
387 }
388
389 pub fn semantically_equivalent_to(&self, other: &Self) -> bool {
390 self.version == other.version
391 && self.hashes == other.hashes
392 && self.projections == other.projections
393 }
394
395 pub fn text_slab(&self) -> ConversationPacketTextSlab {
396 ConversationPacketTextSlab::from_packet(self)
397 }
398
399 pub fn owned_text_batch_fallback(
400 &self,
401 sink: ConversationPacketTextSink,
402 ) -> ConversationPacketTextBatch<'static> {
403 let indices = fallback_sink_message_indices(sink, &self.payload.messages);
404 let messages = indices
405 .into_iter()
406 .filter_map(|message_index| {
407 let message = self.payload.messages.get(message_index)?;
408 Some(ConversationPacketTextMessage {
409 message_index,
410 message_id: message.message_id,
411 idx: message.idx,
412 role: Cow::Owned(message.role.clone()),
413 author: message.author.clone().map(Cow::Owned),
414 created_at: message.created_at,
415 content: Cow::Owned(message.content.clone()),
416 })
417 })
418 .collect();
419 ConversationPacketTextBatch {
420 sink,
421 mode: ConversationPacketTextBatchMode::OwnedFallback,
422 total_content_bytes: packet_total_content_bytes(&self.payload.messages),
423 messages,
424 }
425 }
426
427 fn from_payload(
428 payload: ConversationPacketPayload,
429 builder: ConversationPacketBuilder,
430 ) -> Self {
431 let hashes = packet_hashes(&payload);
432 let projections = packet_projections(&payload.messages);
433 Self {
434 version: CONVERSATION_PACKET_VERSION,
435 diagnostics: ConversationPacketDiagnostics::current(builder),
436 payload,
437 hashes,
438 projections,
439 }
440 }
441
442 fn sink_message_indices(&self, sink: ConversationPacketTextSink) -> Cow<'_, [usize]> {
443 match sink {
444 ConversationPacketTextSink::Lexical => {
445 Cow::Borrowed(&self.projections.lexical.message_indices)
446 }
447 ConversationPacketTextSink::Semantic => {
448 Cow::Borrowed(&self.projections.semantic.message_indices)
449 }
450 ConversationPacketTextSink::Fingerprint => {
451 Cow::Owned((0..self.payload.messages.len()).collect())
452 }
453 }
454 }
455
456 fn sink_total_content_bytes(&self, sink: ConversationPacketTextSink) -> usize {
457 match sink {
458 ConversationPacketTextSink::Lexical => self.projections.lexical.total_content_bytes,
459 ConversationPacketTextSink::Semantic => self.projections.semantic.total_content_bytes,
460 ConversationPacketTextSink::Fingerprint => {
461 packet_total_content_bytes(&self.payload.messages)
462 }
463 }
464 }
465}
466
467fn fallback_sink_message_indices(
468 sink: ConversationPacketTextSink,
469 messages: &[ConversationPacketMessage],
470) -> Vec<usize> {
471 match sink {
472 ConversationPacketTextSink::Lexical | ConversationPacketTextSink::Semantic => messages
473 .iter()
474 .enumerate()
475 .filter(|(_, message)| !message.content.is_empty())
476 .map(|(idx, _)| idx)
477 .collect(),
478 ConversationPacketTextSink::Fingerprint => (0..messages.len()).collect(),
479 }
480}
481
482fn packet_total_content_bytes(messages: &[ConversationPacketMessage]) -> usize {
483 messages
484 .iter()
485 .map(|message| message.content.len())
486 .sum::<usize>()
487}
488
489fn path_to_packet_string(path: &Path) -> String {
490 path.to_string_lossy().into_owned()
491}
492
493fn normalize_role(role: &str) -> String {
494 match role.trim().to_ascii_lowercase().as_str() {
495 "agent" | "assistant" => "assistant".to_string(),
496 "user" => "user".to_string(),
497 "tool" => "tool".to_string(),
498 "system" => "system".to_string(),
499 other => other.to_string(),
500 }
501}
502
503fn canonical_role(role: &MessageRole) -> String {
504 match role {
505 MessageRole::User => "user".to_string(),
506 MessageRole::Agent => "assistant".to_string(),
507 MessageRole::Tool => "tool".to_string(),
508 MessageRole::System => "system".to_string(),
509 MessageRole::Other(other) => normalize_role(other),
510 }
511}
512
513fn packet_message_from_normalized(message: &NormalizedMessage) -> ConversationPacketMessage {
514 ConversationPacketMessage {
515 message_id: None,
516 idx: message.idx,
517 role: normalize_role(&message.role),
518 author: message.author.clone(),
519 created_at: message.created_at,
520 content: message.content.clone(),
521 extra_json: message.extra.clone(),
522 snippets: message
523 .snippets
524 .iter()
525 .map(packet_snippet_from_normalized)
526 .collect(),
527 }
528}
529
530fn packet_message_from_canonical(message: &Message) -> ConversationPacketMessage {
531 ConversationPacketMessage {
532 message_id: message.id,
533 idx: message.idx,
534 role: canonical_role(&message.role),
535 author: message.author.clone(),
536 created_at: message.created_at,
537 content: message.content.clone(),
538 extra_json: message.extra_json.clone(),
539 snippets: message
540 .snippets
541 .iter()
542 .map(packet_snippet_from_canonical)
543 .collect(),
544 }
545}
546
547fn packet_snippet_from_normalized(snippet: &NormalizedSnippet) -> ConversationPacketSnippet {
548 ConversationPacketSnippet {
549 file_path: snippet.file_path.as_deref().map(path_to_packet_string),
550 start_line: snippet.start_line,
551 end_line: snippet.end_line,
552 language: snippet.language.clone(),
553 snippet_text: snippet.snippet_text.clone(),
554 }
555}
556
557fn packet_snippet_from_canonical(snippet: &Snippet) -> ConversationPacketSnippet {
558 ConversationPacketSnippet {
559 file_path: snippet.file_path.as_deref().map(path_to_packet_string),
560 start_line: snippet.start_line,
561 end_line: snippet.end_line,
562 language: snippet.language.clone(),
563 snippet_text: snippet.snippet_text.clone(),
564 }
565}
566
567fn timestamps_from_parts(
568 started_at: Option<i64>,
569 ended_at: Option<i64>,
570 messages: &[ConversationPacketMessage],
571) -> ConversationPacketTimestamps {
572 let first_message_at = messages
573 .iter()
574 .filter_map(|message| message.created_at)
575 .min();
576 let last_message_at = messages
577 .iter()
578 .filter_map(|message| message.created_at)
579 .max();
580 ConversationPacketTimestamps {
581 started_at,
582 ended_at,
583 first_message_at,
584 last_message_at,
585 }
586}
587
588fn packet_projections(messages: &[ConversationPacketMessage]) -> ConversationPacketSinkProjections {
589 let message_indices = messages
590 .iter()
591 .enumerate()
592 .filter(|(_, message)| !message.content.is_empty())
593 .map(|(idx, _)| idx)
594 .collect::<Vec<_>>();
595 let total_content_bytes = messages
596 .iter()
597 .map(|message| message.content.len())
598 .sum::<usize>();
599 let mut analytics = ConversationPacketAnalyticsProjection {
600 user_messages: 0,
601 assistant_messages: 0,
602 tool_messages: 0,
603 system_messages: 0,
604 other_messages: 0,
605 };
606 for message in messages {
607 match message.role.as_str() {
608 "user" => analytics.user_messages += 1,
609 "assistant" => analytics.assistant_messages += 1,
610 "tool" => analytics.tool_messages += 1,
611 "system" => analytics.system_messages += 1,
612 _ => analytics.other_messages += 1,
613 }
614 }
615 ConversationPacketSinkProjections {
616 lexical: ConversationPacketLexicalProjection {
617 message_indices: message_indices.clone(),
618 total_content_bytes,
619 },
620 semantic: ConversationPacketSemanticProjection {
621 message_indices,
622 total_content_bytes,
623 },
624 analytics,
625 }
626}
627
628fn packet_hashes(payload: &ConversationPacketPayload) -> ConversationPacketHashes {
629 let mut semantic = blake3::Hasher::new();
630 update_u32(&mut semantic, "version", CONVERSATION_PACKET_VERSION);
631 update_identity_hash(&mut semantic, &payload.identity);
632 update_provenance_hash(&mut semantic, &payload.provenance);
633 update_timestamps_hash(&mut semantic, &payload.timestamps);
634 update_json(&mut semantic, "metadata_json", &payload.metadata_json);
635 update_messages_hash(&mut semantic, &payload.messages);
636
637 let mut messages = blake3::Hasher::new();
638 update_u32(&mut messages, "version", CONVERSATION_PACKET_VERSION);
639 update_messages_hash(&mut messages, &payload.messages);
640
641 ConversationPacketHashes {
642 semantic_hash: semantic.finalize().to_hex().to_string(),
643 message_hash: messages.finalize().to_hex().to_string(),
644 }
645}
646
647fn update_identity_hash(hasher: &mut blake3::Hasher, identity: &ConversationPacketIdentity) {
648 update_str(hasher, "agent_slug", &identity.agent_slug);
649 update_opt_str(hasher, "external_id", identity.external_id.as_deref());
650 update_opt_str(hasher, "workspace", identity.workspace.as_deref());
651 update_str(hasher, "source_path", &identity.source_path);
652 update_opt_str(hasher, "title", identity.title.as_deref());
653}
654
655fn update_provenance_hash(hasher: &mut blake3::Hasher, provenance: &ConversationPacketProvenance) {
656 update_str(hasher, "source_id", &provenance.source_id);
657 update_str(hasher, "origin_kind", &provenance.origin_kind);
658 update_opt_str(hasher, "origin_host", provenance.origin_host.as_deref());
659}
660
661fn update_timestamps_hash(hasher: &mut blake3::Hasher, timestamps: &ConversationPacketTimestamps) {
662 update_opt_i64(hasher, "started_at", timestamps.started_at);
663 update_opt_i64(hasher, "ended_at", timestamps.ended_at);
664 update_opt_i64(hasher, "first_message_at", timestamps.first_message_at);
665 update_opt_i64(hasher, "last_message_at", timestamps.last_message_at);
666}
667
668fn update_messages_hash(hasher: &mut blake3::Hasher, messages: &[ConversationPacketMessage]) {
669 update_usize(hasher, "message_count", messages.len());
670 for message in messages {
671 update_i64(hasher, "message_idx", message.idx);
672 update_str(hasher, "message_role", &message.role);
673 update_opt_str(hasher, "message_author", message.author.as_deref());
674 update_opt_i64(hasher, "message_created_at", message.created_at);
675 update_str(hasher, "message_content", &message.content);
676 update_json(hasher, "message_extra_json", &message.extra_json);
677 update_usize(hasher, "snippet_count", message.snippets.len());
678 for snippet in &message.snippets {
679 update_opt_str(hasher, "snippet_file_path", snippet.file_path.as_deref());
680 update_opt_i64(hasher, "snippet_start_line", snippet.start_line);
681 update_opt_i64(hasher, "snippet_end_line", snippet.end_line);
682 update_opt_str(hasher, "snippet_language", snippet.language.as_deref());
683 update_opt_str(hasher, "snippet_text", snippet.snippet_text.as_deref());
684 }
685 }
686}
687
688fn update_label(hasher: &mut blake3::Hasher, label: &str) {
689 hasher.update(label.as_bytes());
690 hasher.update(&[0]);
691}
692
693fn update_str(hasher: &mut blake3::Hasher, label: &str, value: &str) {
694 update_label(hasher, label);
695 update_usize(hasher, "len", value.len());
696 hasher.update(value.as_bytes());
697}
698
699fn update_opt_str(hasher: &mut blake3::Hasher, label: &str, value: Option<&str>) {
700 match value {
701 Some(value) => {
702 update_label(hasher, label);
703 hasher.update(&[1]);
704 update_usize(hasher, "len", value.len());
705 hasher.update(value.as_bytes());
706 }
707 None => {
708 update_label(hasher, label);
709 hasher.update(&[0]);
710 }
711 }
712}
713
714fn update_json(hasher: &mut blake3::Hasher, label: &str, value: &Value) {
715 let stable = serde_json::to_string(value).unwrap_or_else(|_| "null".to_string());
716 update_str(hasher, label, &stable);
717}
718
719fn update_i64(hasher: &mut blake3::Hasher, label: &str, value: i64) {
720 update_label(hasher, label);
721 hasher.update(&value.to_le_bytes());
722}
723
724fn update_opt_i64(hasher: &mut blake3::Hasher, label: &str, value: Option<i64>) {
725 update_label(hasher, label);
726 match value {
727 Some(value) => {
728 hasher.update(&[1]);
729 hasher.update(&value.to_le_bytes());
730 }
731 None => {
732 hasher.update(&[0]);
733 }
734 }
735}
736
737fn update_u32(hasher: &mut blake3::Hasher, label: &str, value: u32) {
738 update_label(hasher, label);
739 hasher.update(&value.to_le_bytes());
740}
741
742fn update_usize(hasher: &mut blake3::Hasher, label: &str, value: usize) {
743 update_label(hasher, label);
744 let value = u64::try_from(value).unwrap_or(u64::MAX);
745 hasher.update(&value.to_le_bytes());
746}
747
748#[cfg(test)]
749mod tests {
750 use super::*;
751 use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
752 use crate::model::types::{Conversation, Message, MessageRole, Snippet};
753 use serde_json::json;
754 use std::path::PathBuf;
755
756 fn raw_conversation() -> NormalizedConversation {
757 NormalizedConversation {
758 agent_slug: "codex".to_string(),
759 external_id: Some("session-1".to_string()),
760 title: Some("Packet contract".to_string()),
761 workspace: Some(PathBuf::from("/work/cass")),
762 source_path: PathBuf::from("/work/cass/.codex/session.jsonl"),
763 started_at: Some(1_700_000_000_000),
764 ended_at: Some(1_700_000_010_000),
765 metadata: json!({"model": "gpt-5", "temperature": 0}),
766 messages: vec![
767 NormalizedMessage {
768 idx: 0,
769 role: "user".to_string(),
770 author: Some("human".to_string()),
771 created_at: Some(1_700_000_000_000),
772 content: "build the packet".to_string(),
773 extra: json!({"turn": 1}),
774 snippets: vec![NormalizedSnippet {
775 file_path: Some(PathBuf::from("src/main.rs")),
776 start_line: Some(10),
777 end_line: Some(12),
778 language: Some("rust".to_string()),
779 snippet_text: Some("fn main() {}".to_string()),
780 }],
781 invocations: Vec::new(),
782 },
783 NormalizedMessage {
784 idx: 1,
785 role: "assistant".to_string(),
786 author: None,
787 created_at: Some(1_700_000_001_000),
788 content: "packet built".to_string(),
789 extra: json!({}),
790 snippets: Vec::new(),
791 invocations: Vec::new(),
792 },
793 ],
794 }
795 }
796
797 fn canonical_conversation() -> Conversation {
798 Conversation {
799 id: Some(42),
800 agent_slug: "codex".to_string(),
801 workspace: Some(PathBuf::from("/work/cass")),
802 external_id: Some("session-1".to_string()),
803 title: Some("Packet contract".to_string()),
804 source_path: PathBuf::from("/work/cass/.codex/session.jsonl"),
805 started_at: Some(1_700_000_000_000),
806 ended_at: Some(1_700_000_010_000),
807 approx_tokens: None,
808 metadata_json: json!({"model": "gpt-5", "temperature": 0}),
809 source_id: "local".to_string(),
810 origin_host: None,
811 messages: vec![
812 Message {
813 id: Some(100),
814 idx: 0,
815 role: MessageRole::User,
816 author: Some("human".to_string()),
817 created_at: Some(1_700_000_000_000),
818 content: "build the packet".to_string(),
819 extra_json: json!({"turn": 1}),
820 snippets: vec![Snippet {
821 id: Some(7),
822 file_path: Some(PathBuf::from("src/main.rs")),
823 start_line: Some(10),
824 end_line: Some(12),
825 language: Some("rust".to_string()),
826 snippet_text: Some("fn main() {}".to_string()),
827 }],
828 },
829 Message {
830 id: Some(101),
831 idx: 1,
832 role: MessageRole::Agent,
833 author: None,
834 created_at: Some(1_700_000_001_000),
835 content: "packet built".to_string(),
836 extra_json: json!({}),
837 snippets: Vec::new(),
838 },
839 ],
840 }
841 }
842
843 #[test]
844 fn raw_and_canonical_builders_produce_equivalent_packet_semantics() {
845 let provenance = ConversationPacketProvenance::local();
846 let raw = ConversationPacket::from_normalized_conversation(
847 &raw_conversation(),
848 provenance.clone(),
849 );
850 let canonical =
851 ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
852
853 assert_eq!(raw.version, CONVERSATION_PACKET_VERSION);
854 assert!(raw.semantically_equivalent_to(&canonical));
855 assert_eq!(raw.payload.messages[1].role, "assistant");
856 assert_eq!(canonical.payload.messages[1].role, "assistant");
857 assert_eq!(raw.projections.lexical.message_indices, vec![0, 1]);
858 assert_eq!(raw.projections.analytics.user_messages, 1);
859 assert_eq!(raw.projections.analytics.assistant_messages, 1);
860 }
861
862 #[test]
863 fn packet_hash_changes_when_normalized_content_changes() {
864 let mut changed = raw_conversation();
865 changed.messages[1].content = "packet changed".to_string();
866
867 let original = ConversationPacket::from_normalized_conversation(
868 &raw_conversation(),
869 ConversationPacketProvenance::local(),
870 );
871 let changed = ConversationPacket::from_normalized_conversation(
872 &changed,
873 ConversationPacketProvenance::local(),
874 );
875
876 assert_ne!(original.hashes.semantic_hash, changed.hashes.semantic_hash);
877 assert_ne!(original.hashes.message_hash, changed.hashes.message_hash);
878 }
879
880 #[test]
881 fn text_slab_reuses_one_utf8_arena_for_packet_sinks() {
882 let mut canonical = canonical_conversation();
883 canonical.messages[0].content = format!("build {} packet", "\u{2603}");
884 canonical.messages.push(Message {
885 id: Some(102),
886 idx: 2,
887 role: MessageRole::System,
888 author: None,
889 created_at: Some(1_700_000_002_000),
890 content: String::new(),
891 extra_json: json!({}),
892 snippets: Vec::new(),
893 });
894 let packet = ConversationPacket::from_canonical_replay(
895 &canonical,
896 ConversationPacketProvenance::local(),
897 );
898 let slab = packet.text_slab();
899
900 assert_eq!(slab.message_count(), 3);
901 let range = slab
902 .message_range(0)
903 .expect("first message should have a slab range");
904 assert!(slab.text().is_char_boundary(range.start));
905 assert!(slab.text().is_char_boundary(range.end));
906 assert_eq!(
907 slab.message_content(0),
908 Some(packet.payload.messages[0].content.as_str())
909 );
910
911 let lexical = slab
912 .sink_batch(&packet, ConversationPacketTextSink::Lexical)
913 .expect("lexical projection should borrow from the slab");
914 let semantic = slab
915 .sink_batch(&packet, ConversationPacketTextSink::Semantic)
916 .expect("semantic projection should borrow from the slab");
917 let fingerprint = slab
918 .sink_batch(&packet, ConversationPacketTextSink::Fingerprint)
919 .expect("fingerprint projection should cover all messages");
920
921 assert_eq!(lexical.mode, ConversationPacketTextBatchMode::Slab);
922 assert_eq!(lexical.len(), 2, "empty content stays out of lexical");
923 assert_eq!(semantic.len(), 2, "empty content stays out of semantic");
924 assert_eq!(fingerprint.len(), 3, "fingerprint sees every message");
925 assert!(fingerprint.messages()[2].content.is_empty());
926
927 let slab_content = slab
928 .message_content(0)
929 .expect("first message should be readable from the slab");
930 assert!(std::ptr::eq(
931 lexical.messages()[0].content.as_ref().as_ptr(),
932 slab_content.as_ptr()
933 ));
934 assert!(std::ptr::eq(
935 semantic.messages()[0].content.as_ref().as_ptr(),
936 slab_content.as_ptr()
937 ));
938 assert_eq!(
939 lexical.messages()[0].content.as_ref(),
940 "build \u{2603} packet"
941 );
942 }
943
944 #[test]
945 fn owned_text_batch_fallback_recovers_from_bad_projection() {
946 let mut packet = ConversationPacket::from_canonical_replay(
947 &canonical_conversation(),
948 ConversationPacketProvenance::local(),
949 );
950 packet.projections.semantic.message_indices = vec![0, 99];
951 let slab = packet.text_slab();
952 let err = slab
953 .sink_batch(&packet, ConversationPacketTextSink::Semantic)
954 .expect_err("bad projection should not build a slab view");
955
956 assert_eq!(err.sink, ConversationPacketTextSink::Semantic);
957 assert_eq!(err.message_index, 99);
958 assert_eq!(err.message_count, packet.payload.messages.len());
959
960 let fallback = packet.owned_text_batch_fallback(ConversationPacketTextSink::Semantic);
961 assert_eq!(
962 fallback.mode,
963 ConversationPacketTextBatchMode::OwnedFallback
964 );
965 assert_eq!(fallback.len(), 2);
966 assert!(
967 matches!(fallback.messages()[0].content, Cow::Owned(_)),
968 "fallback should own content instead of borrowing from the slab"
969 );
970 assert_eq!(fallback.messages()[0].content.as_ref(), "build the packet");
971 }
972
973 #[test]
974 fn version_mismatch_diagnostic_is_explicit() {
975 let diagnostic = ConversationPacketDiagnostics::version_mismatch(
976 ConversationPacketBuilder::CanonicalReplay,
977 0,
978 );
979
980 assert_eq!(
981 diagnostic.version_status,
982 ConversationPacketVersionStatus::Mismatch {
983 expected: CONVERSATION_PACKET_VERSION,
984 observed: 0,
985 }
986 );
987 assert!(
988 diagnostic.warnings[0].contains("conversation packet version mismatch"),
989 "diagnostic should explain packet version mismatch"
990 );
991 }
992}