Skip to main content

coding_agent_search/model/
conversation_packet.rs

1//! Versioned normalize-once conversation packet contract.
2//!
3//! A `ConversationPacket` is the canonical unit that refresh and rebuild code
4//! can hand to storage, lexical, analytics, and semantic sinks without asking
5//! each sink to re-normalize the same conversation. The contract keeps the
6//! owned canonical payload separate from lightweight sink projections so future
7//! pipelines can pass indices, counts, and hashes instead of duplicating message
8//! text in every derived structure.
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{borrow::Cow, ops::Range, path::Path};
13
14use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
15use crate::model::types::{Conversation, Message, MessageRole, Snippet};
16
17pub const CONVERSATION_PACKET_VERSION: u32 = 1;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum ConversationPacketBuilder {
21    RawConnectorScan,
22    CanonicalReplay,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub enum ConversationPacketVersionStatus {
27    Current,
28    Mismatch { expected: u32, observed: u32 },
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub struct ConversationPacketDiagnostics {
33    pub builder: ConversationPacketBuilder,
34    pub contract_version: u32,
35    pub version_status: ConversationPacketVersionStatus,
36    pub warnings: Vec<String>,
37}
38
39impl ConversationPacketDiagnostics {
40    pub fn current(builder: ConversationPacketBuilder) -> Self {
41        Self {
42            builder,
43            contract_version: CONVERSATION_PACKET_VERSION,
44            version_status: ConversationPacketVersionStatus::Current,
45            warnings: Vec::new(),
46        }
47    }
48
49    pub fn version_mismatch(builder: ConversationPacketBuilder, observed: u32) -> Self {
50        Self {
51            builder,
52            contract_version: CONVERSATION_PACKET_VERSION,
53            version_status: ConversationPacketVersionStatus::Mismatch {
54                expected: CONVERSATION_PACKET_VERSION,
55                observed,
56            },
57            warnings: vec![format!(
58                "conversation packet version mismatch: expected {}, observed {}",
59                CONVERSATION_PACKET_VERSION, observed
60            )],
61        }
62    }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ConversationPacketProvenance {
67    pub source_id: String,
68    pub origin_kind: String,
69    pub origin_host: Option<String>,
70}
71
72impl ConversationPacketProvenance {
73    pub fn local() -> Self {
74        Self {
75            source_id: "local".to_string(),
76            origin_kind: "local".to_string(),
77            origin_host: None,
78        }
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct ConversationPacketIdentity {
84    pub conversation_id: Option<i64>,
85    pub agent_slug: String,
86    pub external_id: Option<String>,
87    pub workspace: Option<String>,
88    pub source_path: String,
89    pub title: Option<String>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct ConversationPacketTimestamps {
94    pub started_at: Option<i64>,
95    pub ended_at: Option<i64>,
96    pub first_message_at: Option<i64>,
97    pub last_message_at: Option<i64>,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101pub struct ConversationPacketSnippet {
102    pub file_path: Option<String>,
103    pub start_line: Option<i64>,
104    pub end_line: Option<i64>,
105    pub language: Option<String>,
106    pub snippet_text: Option<String>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct ConversationPacketMessage {
111    pub message_id: Option<i64>,
112    pub idx: i64,
113    pub role: String,
114    pub author: Option<String>,
115    pub created_at: Option<i64>,
116    pub content: String,
117    pub extra_json: Value,
118    pub snippets: Vec<ConversationPacketSnippet>,
119}
120
121#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
122pub struct ConversationPacketPayload {
123    pub identity: ConversationPacketIdentity,
124    pub provenance: ConversationPacketProvenance,
125    pub timestamps: ConversationPacketTimestamps,
126    pub metadata_json: Value,
127    pub messages: Vec<ConversationPacketMessage>,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub struct ConversationPacketHashes {
132    /// Versioned BLAKE3 digest of identity, provenance, metadata, timestamps,
133    /// normalized message roles, message content, extras, and snippets.
134    /// Database row IDs are intentionally excluded so raw scans and canonical
135    /// replay can prove semantic equivalence for the same logical conversation.
136    pub semantic_hash: String,
137    /// BLAKE3 digest of normalized message role/content/timestamp/snippet data.
138    pub message_hash: String,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct ConversationPacketLexicalProjection {
143    pub message_indices: Vec<usize>,
144    pub total_content_bytes: usize,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
148pub struct ConversationPacketSemanticProjection {
149    pub message_indices: Vec<usize>,
150    pub total_content_bytes: usize,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct ConversationPacketAnalyticsProjection {
155    pub user_messages: usize,
156    pub assistant_messages: usize,
157    pub tool_messages: usize,
158    pub system_messages: usize,
159    pub other_messages: usize,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163pub struct ConversationPacketSinkProjections {
164    pub lexical: ConversationPacketLexicalProjection,
165    pub semantic: ConversationPacketSemanticProjection,
166    pub analytics: ConversationPacketAnalyticsProjection,
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
170pub struct ConversationPacket {
171    pub version: u32,
172    pub diagnostics: ConversationPacketDiagnostics,
173    pub payload: ConversationPacketPayload,
174    pub hashes: ConversationPacketHashes,
175    pub projections: ConversationPacketSinkProjections,
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
179pub enum ConversationPacketTextSink {
180    Lexical,
181    Semantic,
182    Fingerprint,
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub enum ConversationPacketTextBatchMode {
187    Slab,
188    OwnedFallback,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct ConversationPacketProjectionError {
193    pub sink: ConversationPacketTextSink,
194    pub message_index: usize,
195    pub message_count: usize,
196}
197
198impl std::fmt::Display for ConversationPacketProjectionError {
199    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        write!(
201            formatter,
202            "{:?} packet projection references message index {} but packet has {} messages",
203            self.sink, self.message_index, self.message_count
204        )
205    }
206}
207
208impl std::error::Error for ConversationPacketProjectionError {}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub struct ConversationPacketTextMessage<'a> {
212    pub message_index: usize,
213    pub message_id: Option<i64>,
214    pub idx: i64,
215    pub role: Cow<'a, str>,
216    pub author: Option<Cow<'a, str>>,
217    pub created_at: Option<i64>,
218    pub content: Cow<'a, str>,
219}
220
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct ConversationPacketTextBatch<'a> {
223    pub sink: ConversationPacketTextSink,
224    pub mode: ConversationPacketTextBatchMode,
225    pub total_content_bytes: usize,
226    messages: Vec<ConversationPacketTextMessage<'a>>,
227}
228
229impl<'a> ConversationPacketTextBatch<'a> {
230    pub fn messages(&self) -> &[ConversationPacketTextMessage<'a>] {
231        &self.messages
232    }
233
234    pub fn len(&self) -> usize {
235        self.messages.len()
236    }
237
238    pub fn is_empty(&self) -> bool {
239        self.messages.is_empty()
240    }
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct ConversationPacketTextSlab {
245    text: String,
246    message_ranges: Vec<Range<usize>>,
247}
248
249impl ConversationPacketTextSlab {
250    pub fn from_packet(packet: &ConversationPacket) -> Self {
251        let mut text = String::with_capacity(packet_total_content_bytes(&packet.payload.messages));
252        let mut message_ranges = Vec::with_capacity(packet.payload.messages.len());
253        for message in &packet.payload.messages {
254            let start = text.len();
255            text.push_str(&message.content);
256            let end = text.len();
257            message_ranges.push(start..end);
258        }
259        Self {
260            text,
261            message_ranges,
262        }
263    }
264
265    pub fn text(&self) -> &str {
266        &self.text
267    }
268
269    pub fn message_count(&self) -> usize {
270        self.message_ranges.len()
271    }
272
273    pub fn is_empty(&self) -> bool {
274        self.text.is_empty()
275    }
276
277    pub fn message_content(&self, message_index: usize) -> Option<&str> {
278        self.message_ranges
279            .get(message_index)
280            .and_then(|range| self.text.get(range.clone()))
281    }
282
283    pub fn message_range(&self, message_index: usize) -> Option<Range<usize>> {
284        self.message_ranges.get(message_index).cloned()
285    }
286
287    pub fn sink_batch<'a>(
288        &'a self,
289        packet: &'a ConversationPacket,
290        sink: ConversationPacketTextSink,
291    ) -> Result<ConversationPacketTextBatch<'a>, ConversationPacketProjectionError> {
292        let indices = packet.sink_message_indices(sink);
293        let mut messages = Vec::with_capacity(indices.len());
294        for &message_index in indices.iter() {
295            let Some(message) = packet.payload.messages.get(message_index) else {
296                return Err(ConversationPacketProjectionError {
297                    sink,
298                    message_index,
299                    message_count: packet.payload.messages.len(),
300                });
301            };
302            let Some(content) = self.message_content(message_index) else {
303                return Err(ConversationPacketProjectionError {
304                    sink,
305                    message_index,
306                    message_count: self.message_count(),
307                });
308            };
309            messages.push(ConversationPacketTextMessage {
310                message_index,
311                message_id: message.message_id,
312                idx: message.idx,
313                role: Cow::Borrowed(message.role.as_str()),
314                author: message.author.as_deref().map(Cow::Borrowed),
315                created_at: message.created_at,
316                content: Cow::Borrowed(content),
317            });
318        }
319        Ok(ConversationPacketTextBatch {
320            sink,
321            mode: ConversationPacketTextBatchMode::Slab,
322            total_content_bytes: packet.sink_total_content_bytes(sink),
323            messages,
324        })
325    }
326}
327
328impl ConversationPacket {
329    pub fn from_normalized_conversation(
330        conversation: &NormalizedConversation,
331        provenance: ConversationPacketProvenance,
332    ) -> Self {
333        let messages = conversation
334            .messages
335            .iter()
336            .map(packet_message_from_normalized)
337            .collect::<Vec<_>>();
338        let payload = ConversationPacketPayload {
339            identity: ConversationPacketIdentity {
340                conversation_id: None,
341                agent_slug: conversation.agent_slug.clone(),
342                external_id: conversation.external_id.clone(),
343                workspace: conversation.workspace.as_deref().map(path_to_packet_string),
344                source_path: path_to_packet_string(&conversation.source_path),
345                title: conversation.title.clone(),
346            },
347            provenance,
348            timestamps: timestamps_from_parts(
349                conversation.started_at,
350                conversation.ended_at,
351                &messages,
352            ),
353            metadata_json: conversation.metadata.clone(),
354            messages,
355        };
356        Self::from_payload(payload, ConversationPacketBuilder::RawConnectorScan)
357    }
358
359    pub fn from_canonical_replay(
360        conversation: &Conversation,
361        provenance: ConversationPacketProvenance,
362    ) -> Self {
363        let messages = conversation
364            .messages
365            .iter()
366            .map(packet_message_from_canonical)
367            .collect::<Vec<_>>();
368        let payload = ConversationPacketPayload {
369            identity: ConversationPacketIdentity {
370                conversation_id: conversation.id,
371                agent_slug: conversation.agent_slug.clone(),
372                external_id: conversation.external_id.clone(),
373                workspace: conversation.workspace.as_deref().map(path_to_packet_string),
374                source_path: path_to_packet_string(&conversation.source_path),
375                title: conversation.title.clone(),
376            },
377            provenance,
378            timestamps: timestamps_from_parts(
379                conversation.started_at,
380                conversation.ended_at,
381                &messages,
382            ),
383            metadata_json: conversation.metadata_json.clone(),
384            messages,
385        };
386        Self::from_payload(payload, ConversationPacketBuilder::CanonicalReplay)
387    }
388
389    pub fn semantically_equivalent_to(&self, other: &Self) -> bool {
390        self.version == other.version
391            && self.hashes == other.hashes
392            && self.projections == other.projections
393    }
394
395    pub fn text_slab(&self) -> ConversationPacketTextSlab {
396        ConversationPacketTextSlab::from_packet(self)
397    }
398
399    pub fn owned_text_batch_fallback(
400        &self,
401        sink: ConversationPacketTextSink,
402    ) -> ConversationPacketTextBatch<'static> {
403        let indices = fallback_sink_message_indices(sink, &self.payload.messages);
404        let messages = indices
405            .into_iter()
406            .filter_map(|message_index| {
407                let message = self.payload.messages.get(message_index)?;
408                Some(ConversationPacketTextMessage {
409                    message_index,
410                    message_id: message.message_id,
411                    idx: message.idx,
412                    role: Cow::Owned(message.role.clone()),
413                    author: message.author.clone().map(Cow::Owned),
414                    created_at: message.created_at,
415                    content: Cow::Owned(message.content.clone()),
416                })
417            })
418            .collect();
419        ConversationPacketTextBatch {
420            sink,
421            mode: ConversationPacketTextBatchMode::OwnedFallback,
422            total_content_bytes: packet_total_content_bytes(&self.payload.messages),
423            messages,
424        }
425    }
426
427    fn from_payload(
428        payload: ConversationPacketPayload,
429        builder: ConversationPacketBuilder,
430    ) -> Self {
431        let hashes = packet_hashes(&payload);
432        let projections = packet_projections(&payload.messages);
433        Self {
434            version: CONVERSATION_PACKET_VERSION,
435            diagnostics: ConversationPacketDiagnostics::current(builder),
436            payload,
437            hashes,
438            projections,
439        }
440    }
441
442    fn sink_message_indices(&self, sink: ConversationPacketTextSink) -> Cow<'_, [usize]> {
443        match sink {
444            ConversationPacketTextSink::Lexical => {
445                Cow::Borrowed(&self.projections.lexical.message_indices)
446            }
447            ConversationPacketTextSink::Semantic => {
448                Cow::Borrowed(&self.projections.semantic.message_indices)
449            }
450            ConversationPacketTextSink::Fingerprint => {
451                Cow::Owned((0..self.payload.messages.len()).collect())
452            }
453        }
454    }
455
456    fn sink_total_content_bytes(&self, sink: ConversationPacketTextSink) -> usize {
457        match sink {
458            ConversationPacketTextSink::Lexical => self.projections.lexical.total_content_bytes,
459            ConversationPacketTextSink::Semantic => self.projections.semantic.total_content_bytes,
460            ConversationPacketTextSink::Fingerprint => {
461                packet_total_content_bytes(&self.payload.messages)
462            }
463        }
464    }
465}
466
467fn fallback_sink_message_indices(
468    sink: ConversationPacketTextSink,
469    messages: &[ConversationPacketMessage],
470) -> Vec<usize> {
471    match sink {
472        ConversationPacketTextSink::Lexical | ConversationPacketTextSink::Semantic => messages
473            .iter()
474            .enumerate()
475            .filter(|(_, message)| !message.content.is_empty())
476            .map(|(idx, _)| idx)
477            .collect(),
478        ConversationPacketTextSink::Fingerprint => (0..messages.len()).collect(),
479    }
480}
481
482fn packet_total_content_bytes(messages: &[ConversationPacketMessage]) -> usize {
483    messages
484        .iter()
485        .map(|message| message.content.len())
486        .sum::<usize>()
487}
488
489fn path_to_packet_string(path: &Path) -> String {
490    path.to_string_lossy().into_owned()
491}
492
493fn normalize_role(role: &str) -> String {
494    match role.trim().to_ascii_lowercase().as_str() {
495        "agent" | "assistant" => "assistant".to_string(),
496        "user" => "user".to_string(),
497        "tool" => "tool".to_string(),
498        "system" => "system".to_string(),
499        other => other.to_string(),
500    }
501}
502
503fn canonical_role(role: &MessageRole) -> String {
504    match role {
505        MessageRole::User => "user".to_string(),
506        MessageRole::Agent => "assistant".to_string(),
507        MessageRole::Tool => "tool".to_string(),
508        MessageRole::System => "system".to_string(),
509        MessageRole::Other(other) => normalize_role(other),
510    }
511}
512
513fn packet_message_from_normalized(message: &NormalizedMessage) -> ConversationPacketMessage {
514    ConversationPacketMessage {
515        message_id: None,
516        idx: message.idx,
517        role: normalize_role(&message.role),
518        author: message.author.clone(),
519        created_at: message.created_at,
520        content: message.content.clone(),
521        extra_json: message.extra.clone(),
522        snippets: message
523            .snippets
524            .iter()
525            .map(packet_snippet_from_normalized)
526            .collect(),
527    }
528}
529
530fn packet_message_from_canonical(message: &Message) -> ConversationPacketMessage {
531    ConversationPacketMessage {
532        message_id: message.id,
533        idx: message.idx,
534        role: canonical_role(&message.role),
535        author: message.author.clone(),
536        created_at: message.created_at,
537        content: message.content.clone(),
538        extra_json: message.extra_json.clone(),
539        snippets: message
540            .snippets
541            .iter()
542            .map(packet_snippet_from_canonical)
543            .collect(),
544    }
545}
546
547fn packet_snippet_from_normalized(snippet: &NormalizedSnippet) -> ConversationPacketSnippet {
548    ConversationPacketSnippet {
549        file_path: snippet.file_path.as_deref().map(path_to_packet_string),
550        start_line: snippet.start_line,
551        end_line: snippet.end_line,
552        language: snippet.language.clone(),
553        snippet_text: snippet.snippet_text.clone(),
554    }
555}
556
557fn packet_snippet_from_canonical(snippet: &Snippet) -> ConversationPacketSnippet {
558    ConversationPacketSnippet {
559        file_path: snippet.file_path.as_deref().map(path_to_packet_string),
560        start_line: snippet.start_line,
561        end_line: snippet.end_line,
562        language: snippet.language.clone(),
563        snippet_text: snippet.snippet_text.clone(),
564    }
565}
566
567fn timestamps_from_parts(
568    started_at: Option<i64>,
569    ended_at: Option<i64>,
570    messages: &[ConversationPacketMessage],
571) -> ConversationPacketTimestamps {
572    let first_message_at = messages
573        .iter()
574        .filter_map(|message| message.created_at)
575        .min();
576    let last_message_at = messages
577        .iter()
578        .filter_map(|message| message.created_at)
579        .max();
580    ConversationPacketTimestamps {
581        started_at,
582        ended_at,
583        first_message_at,
584        last_message_at,
585    }
586}
587
588fn packet_projections(messages: &[ConversationPacketMessage]) -> ConversationPacketSinkProjections {
589    let message_indices = messages
590        .iter()
591        .enumerate()
592        .filter(|(_, message)| !message.content.is_empty())
593        .map(|(idx, _)| idx)
594        .collect::<Vec<_>>();
595    let total_content_bytes = messages
596        .iter()
597        .map(|message| message.content.len())
598        .sum::<usize>();
599    let mut analytics = ConversationPacketAnalyticsProjection {
600        user_messages: 0,
601        assistant_messages: 0,
602        tool_messages: 0,
603        system_messages: 0,
604        other_messages: 0,
605    };
606    for message in messages {
607        match message.role.as_str() {
608            "user" => analytics.user_messages += 1,
609            "assistant" => analytics.assistant_messages += 1,
610            "tool" => analytics.tool_messages += 1,
611            "system" => analytics.system_messages += 1,
612            _ => analytics.other_messages += 1,
613        }
614    }
615    ConversationPacketSinkProjections {
616        lexical: ConversationPacketLexicalProjection {
617            message_indices: message_indices.clone(),
618            total_content_bytes,
619        },
620        semantic: ConversationPacketSemanticProjection {
621            message_indices,
622            total_content_bytes,
623        },
624        analytics,
625    }
626}
627
628fn packet_hashes(payload: &ConversationPacketPayload) -> ConversationPacketHashes {
629    let mut semantic = blake3::Hasher::new();
630    update_u32(&mut semantic, "version", CONVERSATION_PACKET_VERSION);
631    update_identity_hash(&mut semantic, &payload.identity);
632    update_provenance_hash(&mut semantic, &payload.provenance);
633    update_timestamps_hash(&mut semantic, &payload.timestamps);
634    update_json(&mut semantic, "metadata_json", &payload.metadata_json);
635    update_messages_hash(&mut semantic, &payload.messages);
636
637    let mut messages = blake3::Hasher::new();
638    update_u32(&mut messages, "version", CONVERSATION_PACKET_VERSION);
639    update_messages_hash(&mut messages, &payload.messages);
640
641    ConversationPacketHashes {
642        semantic_hash: semantic.finalize().to_hex().to_string(),
643        message_hash: messages.finalize().to_hex().to_string(),
644    }
645}
646
647fn update_identity_hash(hasher: &mut blake3::Hasher, identity: &ConversationPacketIdentity) {
648    update_str(hasher, "agent_slug", &identity.agent_slug);
649    update_opt_str(hasher, "external_id", identity.external_id.as_deref());
650    update_opt_str(hasher, "workspace", identity.workspace.as_deref());
651    update_str(hasher, "source_path", &identity.source_path);
652    update_opt_str(hasher, "title", identity.title.as_deref());
653}
654
655fn update_provenance_hash(hasher: &mut blake3::Hasher, provenance: &ConversationPacketProvenance) {
656    update_str(hasher, "source_id", &provenance.source_id);
657    update_str(hasher, "origin_kind", &provenance.origin_kind);
658    update_opt_str(hasher, "origin_host", provenance.origin_host.as_deref());
659}
660
661fn update_timestamps_hash(hasher: &mut blake3::Hasher, timestamps: &ConversationPacketTimestamps) {
662    update_opt_i64(hasher, "started_at", timestamps.started_at);
663    update_opt_i64(hasher, "ended_at", timestamps.ended_at);
664    update_opt_i64(hasher, "first_message_at", timestamps.first_message_at);
665    update_opt_i64(hasher, "last_message_at", timestamps.last_message_at);
666}
667
668fn update_messages_hash(hasher: &mut blake3::Hasher, messages: &[ConversationPacketMessage]) {
669    update_usize(hasher, "message_count", messages.len());
670    for message in messages {
671        update_i64(hasher, "message_idx", message.idx);
672        update_str(hasher, "message_role", &message.role);
673        update_opt_str(hasher, "message_author", message.author.as_deref());
674        update_opt_i64(hasher, "message_created_at", message.created_at);
675        update_str(hasher, "message_content", &message.content);
676        update_json(hasher, "message_extra_json", &message.extra_json);
677        update_usize(hasher, "snippet_count", message.snippets.len());
678        for snippet in &message.snippets {
679            update_opt_str(hasher, "snippet_file_path", snippet.file_path.as_deref());
680            update_opt_i64(hasher, "snippet_start_line", snippet.start_line);
681            update_opt_i64(hasher, "snippet_end_line", snippet.end_line);
682            update_opt_str(hasher, "snippet_language", snippet.language.as_deref());
683            update_opt_str(hasher, "snippet_text", snippet.snippet_text.as_deref());
684        }
685    }
686}
687
688fn update_label(hasher: &mut blake3::Hasher, label: &str) {
689    hasher.update(label.as_bytes());
690    hasher.update(&[0]);
691}
692
693fn update_str(hasher: &mut blake3::Hasher, label: &str, value: &str) {
694    update_label(hasher, label);
695    update_usize(hasher, "len", value.len());
696    hasher.update(value.as_bytes());
697}
698
699fn update_opt_str(hasher: &mut blake3::Hasher, label: &str, value: Option<&str>) {
700    match value {
701        Some(value) => {
702            update_label(hasher, label);
703            hasher.update(&[1]);
704            update_usize(hasher, "len", value.len());
705            hasher.update(value.as_bytes());
706        }
707        None => {
708            update_label(hasher, label);
709            hasher.update(&[0]);
710        }
711    }
712}
713
714fn update_json(hasher: &mut blake3::Hasher, label: &str, value: &Value) {
715    let stable = serde_json::to_string(value).unwrap_or_else(|_| "null".to_string());
716    update_str(hasher, label, &stable);
717}
718
719fn update_i64(hasher: &mut blake3::Hasher, label: &str, value: i64) {
720    update_label(hasher, label);
721    hasher.update(&value.to_le_bytes());
722}
723
724fn update_opt_i64(hasher: &mut blake3::Hasher, label: &str, value: Option<i64>) {
725    update_label(hasher, label);
726    match value {
727        Some(value) => {
728            hasher.update(&[1]);
729            hasher.update(&value.to_le_bytes());
730        }
731        None => {
732            hasher.update(&[0]);
733        }
734    }
735}
736
737fn update_u32(hasher: &mut blake3::Hasher, label: &str, value: u32) {
738    update_label(hasher, label);
739    hasher.update(&value.to_le_bytes());
740}
741
742fn update_usize(hasher: &mut blake3::Hasher, label: &str, value: usize) {
743    update_label(hasher, label);
744    let value = u64::try_from(value).unwrap_or(u64::MAX);
745    hasher.update(&value.to_le_bytes());
746}
747
748#[cfg(test)]
749mod tests {
750    use super::*;
751    use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
752    use crate::model::types::{Conversation, Message, MessageRole, Snippet};
753    use serde_json::json;
754    use std::path::PathBuf;
755
756    fn raw_conversation() -> NormalizedConversation {
757        NormalizedConversation {
758            agent_slug: "codex".to_string(),
759            external_id: Some("session-1".to_string()),
760            title: Some("Packet contract".to_string()),
761            workspace: Some(PathBuf::from("/work/cass")),
762            source_path: PathBuf::from("/work/cass/.codex/session.jsonl"),
763            started_at: Some(1_700_000_000_000),
764            ended_at: Some(1_700_000_010_000),
765            metadata: json!({"model": "gpt-5", "temperature": 0}),
766            messages: vec![
767                NormalizedMessage {
768                    idx: 0,
769                    role: "user".to_string(),
770                    author: Some("human".to_string()),
771                    created_at: Some(1_700_000_000_000),
772                    content: "build the packet".to_string(),
773                    extra: json!({"turn": 1}),
774                    snippets: vec![NormalizedSnippet {
775                        file_path: Some(PathBuf::from("src/main.rs")),
776                        start_line: Some(10),
777                        end_line: Some(12),
778                        language: Some("rust".to_string()),
779                        snippet_text: Some("fn main() {}".to_string()),
780                    }],
781                    invocations: Vec::new(),
782                },
783                NormalizedMessage {
784                    idx: 1,
785                    role: "assistant".to_string(),
786                    author: None,
787                    created_at: Some(1_700_000_001_000),
788                    content: "packet built".to_string(),
789                    extra: json!({}),
790                    snippets: Vec::new(),
791                    invocations: Vec::new(),
792                },
793            ],
794        }
795    }
796
797    fn canonical_conversation() -> Conversation {
798        Conversation {
799            id: Some(42),
800            agent_slug: "codex".to_string(),
801            workspace: Some(PathBuf::from("/work/cass")),
802            external_id: Some("session-1".to_string()),
803            title: Some("Packet contract".to_string()),
804            source_path: PathBuf::from("/work/cass/.codex/session.jsonl"),
805            started_at: Some(1_700_000_000_000),
806            ended_at: Some(1_700_000_010_000),
807            approx_tokens: None,
808            metadata_json: json!({"model": "gpt-5", "temperature": 0}),
809            source_id: "local".to_string(),
810            origin_host: None,
811            messages: vec![
812                Message {
813                    id: Some(100),
814                    idx: 0,
815                    role: MessageRole::User,
816                    author: Some("human".to_string()),
817                    created_at: Some(1_700_000_000_000),
818                    content: "build the packet".to_string(),
819                    extra_json: json!({"turn": 1}),
820                    snippets: vec![Snippet {
821                        id: Some(7),
822                        file_path: Some(PathBuf::from("src/main.rs")),
823                        start_line: Some(10),
824                        end_line: Some(12),
825                        language: Some("rust".to_string()),
826                        snippet_text: Some("fn main() {}".to_string()),
827                    }],
828                },
829                Message {
830                    id: Some(101),
831                    idx: 1,
832                    role: MessageRole::Agent,
833                    author: None,
834                    created_at: Some(1_700_000_001_000),
835                    content: "packet built".to_string(),
836                    extra_json: json!({}),
837                    snippets: Vec::new(),
838                },
839            ],
840        }
841    }
842
843    #[test]
844    fn raw_and_canonical_builders_produce_equivalent_packet_semantics() {
845        let provenance = ConversationPacketProvenance::local();
846        let raw = ConversationPacket::from_normalized_conversation(
847            &raw_conversation(),
848            provenance.clone(),
849        );
850        let canonical =
851            ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
852
853        assert_eq!(raw.version, CONVERSATION_PACKET_VERSION);
854        assert!(raw.semantically_equivalent_to(&canonical));
855        assert_eq!(raw.payload.messages[1].role, "assistant");
856        assert_eq!(canonical.payload.messages[1].role, "assistant");
857        assert_eq!(raw.projections.lexical.message_indices, vec![0, 1]);
858        assert_eq!(raw.projections.analytics.user_messages, 1);
859        assert_eq!(raw.projections.analytics.assistant_messages, 1);
860    }
861
862    #[test]
863    fn packet_hash_changes_when_normalized_content_changes() {
864        let mut changed = raw_conversation();
865        changed.messages[1].content = "packet changed".to_string();
866
867        let original = ConversationPacket::from_normalized_conversation(
868            &raw_conversation(),
869            ConversationPacketProvenance::local(),
870        );
871        let changed = ConversationPacket::from_normalized_conversation(
872            &changed,
873            ConversationPacketProvenance::local(),
874        );
875
876        assert_ne!(original.hashes.semantic_hash, changed.hashes.semantic_hash);
877        assert_ne!(original.hashes.message_hash, changed.hashes.message_hash);
878    }
879
880    #[test]
881    fn text_slab_reuses_one_utf8_arena_for_packet_sinks() {
882        let mut canonical = canonical_conversation();
883        canonical.messages[0].content = format!("build {} packet", "\u{2603}");
884        canonical.messages.push(Message {
885            id: Some(102),
886            idx: 2,
887            role: MessageRole::System,
888            author: None,
889            created_at: Some(1_700_000_002_000),
890            content: String::new(),
891            extra_json: json!({}),
892            snippets: Vec::new(),
893        });
894        let packet = ConversationPacket::from_canonical_replay(
895            &canonical,
896            ConversationPacketProvenance::local(),
897        );
898        let slab = packet.text_slab();
899
900        assert_eq!(slab.message_count(), 3);
901        let range = slab
902            .message_range(0)
903            .expect("first message should have a slab range");
904        assert!(slab.text().is_char_boundary(range.start));
905        assert!(slab.text().is_char_boundary(range.end));
906        assert_eq!(
907            slab.message_content(0),
908            Some(packet.payload.messages[0].content.as_str())
909        );
910
911        let lexical = slab
912            .sink_batch(&packet, ConversationPacketTextSink::Lexical)
913            .expect("lexical projection should borrow from the slab");
914        let semantic = slab
915            .sink_batch(&packet, ConversationPacketTextSink::Semantic)
916            .expect("semantic projection should borrow from the slab");
917        let fingerprint = slab
918            .sink_batch(&packet, ConversationPacketTextSink::Fingerprint)
919            .expect("fingerprint projection should cover all messages");
920
921        assert_eq!(lexical.mode, ConversationPacketTextBatchMode::Slab);
922        assert_eq!(lexical.len(), 2, "empty content stays out of lexical");
923        assert_eq!(semantic.len(), 2, "empty content stays out of semantic");
924        assert_eq!(fingerprint.len(), 3, "fingerprint sees every message");
925        assert!(fingerprint.messages()[2].content.is_empty());
926
927        let slab_content = slab
928            .message_content(0)
929            .expect("first message should be readable from the slab");
930        assert!(std::ptr::eq(
931            lexical.messages()[0].content.as_ref().as_ptr(),
932            slab_content.as_ptr()
933        ));
934        assert!(std::ptr::eq(
935            semantic.messages()[0].content.as_ref().as_ptr(),
936            slab_content.as_ptr()
937        ));
938        assert_eq!(
939            lexical.messages()[0].content.as_ref(),
940            "build \u{2603} packet"
941        );
942    }
943
944    #[test]
945    fn owned_text_batch_fallback_recovers_from_bad_projection() {
946        let mut packet = ConversationPacket::from_canonical_replay(
947            &canonical_conversation(),
948            ConversationPacketProvenance::local(),
949        );
950        packet.projections.semantic.message_indices = vec![0, 99];
951        let slab = packet.text_slab();
952        let err = slab
953            .sink_batch(&packet, ConversationPacketTextSink::Semantic)
954            .expect_err("bad projection should not build a slab view");
955
956        assert_eq!(err.sink, ConversationPacketTextSink::Semantic);
957        assert_eq!(err.message_index, 99);
958        assert_eq!(err.message_count, packet.payload.messages.len());
959
960        let fallback = packet.owned_text_batch_fallback(ConversationPacketTextSink::Semantic);
961        assert_eq!(
962            fallback.mode,
963            ConversationPacketTextBatchMode::OwnedFallback
964        );
965        assert_eq!(fallback.len(), 2);
966        assert!(
967            matches!(fallback.messages()[0].content, Cow::Owned(_)),
968            "fallback should own content instead of borrowing from the slab"
969        );
970        assert_eq!(fallback.messages()[0].content.as_ref(), "build the packet");
971    }
972
973    #[test]
974    fn version_mismatch_diagnostic_is_explicit() {
975        let diagnostic = ConversationPacketDiagnostics::version_mismatch(
976            ConversationPacketBuilder::CanonicalReplay,
977            0,
978        );
979
980        assert_eq!(
981            diagnostic.version_status,
982            ConversationPacketVersionStatus::Mismatch {
983                expected: CONVERSATION_PACKET_VERSION,
984                observed: 0,
985            }
986        );
987        assert!(
988            diagnostic.warnings[0].contains("conversation packet version mismatch"),
989            "diagnostic should explain packet version mismatch"
990        );
991    }
992}