Skip to main content

fsqlite_ext_session/
lib.rs

1use fsqlite_types::serial_type::{read_varint, varint_len, write_varint};
2use fsqlite_types::value::SqliteValue;
3
4// ---------------------------------------------------------------------------
5// Constants
6// ---------------------------------------------------------------------------
7
8/// Changeset table header marker byte ('T').
9const CHANGESET_TABLE_HEADER_BYTE: u8 = 0x54;
10
11/// Patchset table header marker byte ('P').
12const PATCHSET_TABLE_HEADER_BYTE: u8 = 0x50;
13
14/// Legacy alias retained for internal tests that refer to the changeset header byte.
15#[cfg(test)]
16const TABLE_HEADER_BYTE: u8 = CHANGESET_TABLE_HEADER_BYTE;
17
18/// Operation codes used in changeset/patchset binary format.
19const OP_INSERT: u8 = 0x12; // 18
20const OP_DELETE: u8 = 0x09; // 9
21const OP_UPDATE: u8 = 0x17; // 23
22
23/// Value type markers in the changeset binary format.
24const VAL_UNDEFINED: u8 = 0x00;
25const VAL_INTEGER: u8 = 0x01;
26const VAL_REAL: u8 = 0x02;
27const VAL_TEXT: u8 = 0x03;
28const VAL_BLOB: u8 = 0x04;
29const VAL_NULL: u8 = 0x05;
30
31// ---------------------------------------------------------------------------
32// Public API — extension name
33// ---------------------------------------------------------------------------
34
35#[must_use]
36pub const fn extension_name() -> &'static str {
37    "session"
38}
39
40// ---------------------------------------------------------------------------
41// Change operations
42// ---------------------------------------------------------------------------
43
44/// The kind of DML operation recorded in a changeset.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ChangeOp {
47    Insert,
48    Delete,
49    Update,
50}
51
52impl ChangeOp {
53    #[must_use]
54    pub const fn as_byte(self) -> u8 {
55        match self {
56            Self::Insert => OP_INSERT,
57            Self::Delete => OP_DELETE,
58            Self::Update => OP_UPDATE,
59        }
60    }
61
62    /// Decode an operation byte from the changeset format.
63    ///
64    /// Returns `None` for unrecognised bytes.
65    #[must_use]
66    pub const fn from_byte(b: u8) -> Option<Self> {
67        match b {
68            OP_INSERT => Some(Self::Insert),
69            OP_DELETE => Some(Self::Delete),
70            OP_UPDATE => Some(Self::Update),
71            _ => None,
72        }
73    }
74}
75
76// ---------------------------------------------------------------------------
77// Conflict types and actions
78// ---------------------------------------------------------------------------
79
80/// The category of conflict encountered while applying a changeset.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ConflictType {
83    /// The row exists but its current values differ from the expected old values.
84    Data,
85    /// The row to update or delete does not exist in the target database.
86    NotFound,
87    /// A unique-constraint violation occurred (e.g. duplicate key on INSERT).
88    Conflict,
89    /// A non-unique constraint violation occurred (CHECK, NOT NULL, etc.).
90    Constraint,
91    /// A foreign-key constraint violation occurred.
92    ForeignKey,
93}
94
95/// The action the caller wants the apply engine to take for a conflict.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum ConflictAction {
98    /// Skip this change and continue applying the rest of the changeset.
99    OmitChange,
100    /// Overwrite the conflicting row with the incoming change.
101    Replace,
102    /// Abort the entire apply operation immediately.
103    Abort,
104}
105
106// ---------------------------------------------------------------------------
107// Changeset value encoding / decoding
108// ---------------------------------------------------------------------------
109
110/// A single column value in the changeset binary format.
111///
112/// `Undefined` is used in UPDATE records for columns that did not change.
113#[derive(Debug, Clone, PartialEq)]
114pub enum ChangesetValue {
115    Undefined,
116    Null,
117    Integer(i64),
118    Real(f64),
119    Text(String),
120    Blob(Vec<u8>),
121}
122
123impl ChangesetValue {
124    /// Convert from a [`SqliteValue`].
125    #[must_use]
126    pub fn from_sqlite(val: &SqliteValue) -> Self {
127        match val {
128            SqliteValue::Null => Self::Null,
129            SqliteValue::Integer(i) => Self::Integer(*i),
130            SqliteValue::Float(f) => Self::Real(*f),
131            SqliteValue::Text(s) => Self::Text(s.to_string()),
132            SqliteValue::Blob(b) => Self::Blob(b.to_vec()),
133        }
134    }
135
136    /// Convert to a [`SqliteValue`], mapping `Undefined` to `Null`.
137    #[must_use]
138    pub fn to_sqlite(&self) -> SqliteValue {
139        use std::sync::Arc;
140        match self {
141            Self::Undefined | Self::Null => SqliteValue::Null,
142            Self::Integer(i) => SqliteValue::Integer(*i),
143            Self::Real(f) => SqliteValue::Float(*f),
144            Self::Text(s) => SqliteValue::Text(Arc::from(s.as_str())),
145            Self::Blob(b) => SqliteValue::Blob(Arc::from(b.as_slice())),
146        }
147    }
148
149    /// Encode this value into the changeset binary format, appending to `out`.
150    pub fn encode(&self, out: &mut Vec<u8>) {
151        match self {
152            Self::Undefined => {
153                out.push(VAL_UNDEFINED);
154            }
155            Self::Null => {
156                out.push(VAL_NULL);
157            }
158            Self::Integer(i) => {
159                out.push(VAL_INTEGER);
160                out.extend_from_slice(&i.to_be_bytes());
161            }
162            Self::Real(f) => {
163                out.push(VAL_REAL);
164                out.extend_from_slice(&f.to_be_bytes());
165            }
166            Self::Text(s) => {
167                out.push(VAL_TEXT);
168                let bytes = s.as_bytes();
169                let mut vbuf = [0u8; 9];
170                let vlen = write_varint(&mut vbuf, bytes.len() as u64);
171                out.extend_from_slice(&vbuf[..vlen]);
172                out.extend_from_slice(bytes);
173            }
174            Self::Blob(b) => {
175                out.push(VAL_BLOB);
176                let mut vbuf = [0u8; 9];
177                let vlen = write_varint(&mut vbuf, b.len() as u64);
178                out.extend_from_slice(&vbuf[..vlen]);
179                out.extend_from_slice(b);
180            }
181        }
182    }
183
184    /// Decode a single value from `data` starting at `pos`.
185    ///
186    /// Returns `(value, bytes_consumed)` or `None` on malformed input.
187    pub fn decode(data: &[u8], pos: usize) -> Option<(Self, usize)> {
188        let type_byte = *data.get(pos)?;
189        let mut offset = pos + 1;
190        match type_byte {
191            VAL_UNDEFINED => Some((Self::Undefined, offset - pos)),
192            VAL_NULL => Some((Self::Null, offset - pos)),
193            VAL_INTEGER => {
194                let end = offset + 8;
195                if data.len() < end {
196                    return None;
197                }
198                let arr: [u8; 8] = data[offset..end].try_into().ok()?;
199                Some((Self::Integer(i64::from_be_bytes(arr)), end - pos))
200            }
201            VAL_REAL => {
202                let end = offset + 8;
203                if data.len() < end {
204                    return None;
205                }
206                let arr: [u8; 8] = data[offset..end].try_into().ok()?;
207                Some((Self::Real(f64::from_be_bytes(arr)), end - pos))
208            }
209            VAL_TEXT => {
210                let (len, vlen) = read_varint(&data[offset..])?;
211                offset += vlen;
212                let len = usize::try_from(len).ok()?;
213                let end = offset + len;
214                if data.len() < end {
215                    return None;
216                }
217                let s = std::str::from_utf8(&data[offset..end]).ok()?;
218                Some((Self::Text(s.into()), end - pos))
219            }
220            VAL_BLOB => {
221                let (len, vlen) = read_varint(&data[offset..])?;
222                offset += vlen;
223                let len = usize::try_from(len).ok()?;
224                let end = offset + len;
225                if data.len() < end {
226                    return None;
227                }
228                Some((Self::Blob(data[offset..end].to_vec()), end - pos))
229            }
230            _ => None,
231        }
232    }
233}
234
235// ---------------------------------------------------------------------------
236// Table info carried in the changeset
237// ---------------------------------------------------------------------------
238
239/// Per-table metadata stored in the changeset header.
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct TableInfo {
242    /// Table name.
243    pub name: String,
244    /// Number of columns.
245    pub column_count: usize,
246    /// For each column, `true` if it is part of the primary key.
247    pub pk_flags: Vec<bool>,
248}
249
250impl TableInfo {
251    fn encode_with_header(&self, out: &mut Vec<u8>, header_byte: u8) {
252        out.push(header_byte);
253        let mut vbuf = [0u8; 9];
254        let vlen = write_varint(&mut vbuf, self.column_count as u64);
255        out.extend_from_slice(&vbuf[..vlen]);
256        for &pk in &self.pk_flags {
257            out.push(u8::from(pk));
258        }
259        out.extend_from_slice(self.name.as_bytes());
260        out.push(0x00); // NUL terminator
261    }
262
263    /// Encode the table header into changeset binary format.
264    pub fn encode(&self, out: &mut Vec<u8>) {
265        self.encode_with_header(out, CHANGESET_TABLE_HEADER_BYTE);
266    }
267
268    /// Encode the table header into patchset binary format.
269    pub fn encode_patchset(&self, out: &mut Vec<u8>) {
270        self.encode_with_header(out, PATCHSET_TABLE_HEADER_BYTE);
271    }
272
273    /// Decode a table header starting at `pos`.
274    ///
275    /// Returns `(TableInfo, bytes_consumed)` or `None`.
276    fn decode_with_header(data: &[u8], pos: usize, header_byte: u8) -> Option<(Self, usize)> {
277        if *data.get(pos)? != header_byte {
278            return None;
279        }
280        let mut offset = pos + 1;
281        let (col_count, vlen) = read_varint(&data[offset..])?;
282        offset += vlen;
283        let col_count = usize::try_from(col_count).ok()?;
284        if data.len() < offset + col_count {
285            return None;
286        }
287        let pk_flags: Vec<bool> = data[offset..offset + col_count]
288            .iter()
289            .map(|&b| b != 0)
290            .collect();
291        offset += col_count;
292        // Read NUL-terminated table name.
293        let name_start = offset;
294        let nul_pos = data[name_start..].iter().position(|&b| b == 0)?;
295        let name = std::str::from_utf8(&data[name_start..name_start + nul_pos])
296            .ok()?
297            .to_owned();
298        offset = name_start + nul_pos + 1;
299        Some((
300            Self {
301                name,
302                column_count: col_count,
303                pk_flags,
304            },
305            offset - pos,
306        ))
307    }
308
309    pub fn decode(data: &[u8], pos: usize) -> Option<(Self, usize)> {
310        Self::decode_with_header(data, pos, CHANGESET_TABLE_HEADER_BYTE)
311    }
312
313    pub fn decode_patchset(data: &[u8], pos: usize) -> Option<(Self, usize)> {
314        Self::decode_with_header(data, pos, PATCHSET_TABLE_HEADER_BYTE)
315    }
316}
317
318// ---------------------------------------------------------------------------
319// Change row
320// ---------------------------------------------------------------------------
321
322/// A single row change recorded in a changeset.
323#[derive(Debug, Clone, PartialEq)]
324pub struct ChangesetRow {
325    pub op: ChangeOp,
326    /// For DELETE and UPDATE: the old column values. Empty for INSERT.
327    pub old_values: Vec<ChangesetValue>,
328    /// For INSERT and UPDATE: the new column values. Empty for DELETE.
329    pub new_values: Vec<ChangesetValue>,
330}
331
332impl ChangesetRow {
333    /// Encode this row change into changeset binary format.
334    pub fn encode_changeset(&self, out: &mut Vec<u8>) {
335        out.push(self.op.as_byte());
336        out.push(0x00);
337        match self.op {
338            ChangeOp::Insert => {
339                for v in &self.new_values {
340                    v.encode(out);
341                }
342            }
343            ChangeOp::Delete => {
344                for v in &self.old_values {
345                    v.encode(out);
346                }
347            }
348            ChangeOp::Update => {
349                for v in &self.old_values {
350                    v.encode(out);
351                }
352                for v in &self.new_values {
353                    v.encode(out);
354                }
355            }
356        }
357    }
358
359    /// Encode this row change into patchset binary format.
360    ///
361    /// For INSERT and DELETE this is identical to changeset encoding.
362    /// For UPDATE the patchset stores a single record whose PK slots contain
363    /// the original key values and whose non-PK slots contain changed new
364    /// values or [`ChangesetValue::Undefined`] for unchanged columns.
365    pub fn encode_patchset(&self, out: &mut Vec<u8>, pk_flags: &[bool]) {
366        out.push(self.op.as_byte());
367        out.push(0x00);
368        match self.op {
369            ChangeOp::Insert => {
370                for v in &self.new_values {
371                    v.encode(out);
372                }
373            }
374            ChangeOp::Delete => {
375                for (i, v) in self.old_values.iter().enumerate() {
376                    if pk_flags.get(i).copied().unwrap_or(false) {
377                        v.encode(out);
378                    }
379                }
380            }
381            ChangeOp::Update => {
382                for (index, new_value) in self.new_values.iter().enumerate() {
383                    if pk_flags.get(index).copied().unwrap_or(false) {
384                        self.old_values
385                            .get(index)
386                            .unwrap_or(&ChangesetValue::Undefined)
387                            .encode(out);
388                    } else {
389                        new_value.encode(out);
390                    }
391                }
392            }
393        }
394    }
395
396    /// Decode one changeset row starting at `pos`, given the column count.
397    pub fn decode_changeset(data: &[u8], pos: usize, col_count: usize) -> Option<(Self, usize)> {
398        let op = ChangeOp::from_byte(*data.get(pos)?)?;
399        let mut offset = pos + 2;
400        let _indirect = *data.get(pos + 1)?;
401
402        let decode_n = |data: &[u8], offset: &mut usize, n: usize| -> Option<Vec<ChangesetValue>> {
403            let mut vals = Vec::with_capacity(n);
404            for _ in 0..n {
405                let (v, consumed) = ChangesetValue::decode(data, *offset)?;
406                *offset += consumed;
407                vals.push(v);
408            }
409            Some(vals)
410        };
411
412        let (old_values, new_values) = match op {
413            ChangeOp::Insert => {
414                let new_values = decode_n(data, &mut offset, col_count)?;
415                (Vec::new(), new_values)
416            }
417            ChangeOp::Delete => {
418                let old_values = decode_n(data, &mut offset, col_count)?;
419                (old_values, Vec::new())
420            }
421            ChangeOp::Update => {
422                let old_values = decode_n(data, &mut offset, col_count)?;
423                let new_values = decode_n(data, &mut offset, col_count)?;
424                (old_values, new_values)
425            }
426        };
427
428        Some((
429            Self {
430                op,
431                old_values,
432                new_values,
433            },
434            offset - pos,
435        ))
436    }
437
438    /// Decode one patchset row starting at `pos`.
439    ///
440    /// Patchset UPDATE rows only store primary-key old values. Non-PK old
441    /// values are reconstructed as [`ChangesetValue::Undefined`] so the
442    /// decoded row can reuse the normal apply path.
443    pub fn decode_patchset(
444        data: &[u8],
445        pos: usize,
446        col_count: usize,
447        pk_flags: &[bool],
448    ) -> Option<(Self, usize)> {
449        if pk_flags.len() != col_count {
450            return None;
451        }
452
453        let op = ChangeOp::from_byte(*data.get(pos)?)?;
454        let mut offset = pos + 2;
455        let _indirect = *data.get(pos + 1)?;
456
457        let decode_n = |data: &[u8], offset: &mut usize, n: usize| -> Option<Vec<ChangesetValue>> {
458            let mut vals = Vec::with_capacity(n);
459            for _ in 0..n {
460                let (v, consumed) = ChangesetValue::decode(data, *offset)?;
461                *offset += consumed;
462                vals.push(v);
463            }
464            Some(vals)
465        };
466
467        let (old_values, new_values) = match op {
468            ChangeOp::Insert => {
469                let new_values = decode_n(data, &mut offset, col_count)?;
470                (Vec::new(), new_values)
471            }
472            ChangeOp::Delete => {
473                let pk_count = pk_flags.iter().filter(|&&is_pk| is_pk).count();
474                if pk_count == 0 {
475                    return None;
476                }
477                let pk_old_values = decode_n(data, &mut offset, pk_count)?;
478                let mut old_values = Vec::with_capacity(col_count);
479                let mut pk_iter = pk_old_values.into_iter();
480                for is_pk in pk_flags {
481                    if *is_pk {
482                        old_values.push(pk_iter.next()?);
483                    } else {
484                        old_values.push(ChangesetValue::Undefined);
485                    }
486                }
487                (old_values, Vec::new())
488            }
489            ChangeOp::Update => {
490                let record = decode_n(data, &mut offset, col_count)?;
491                let mut old_values = Vec::with_capacity(col_count);
492                let mut new_values = Vec::with_capacity(col_count);
493                for (index, value) in record.into_iter().enumerate() {
494                    if pk_flags.get(index).copied().unwrap_or(false) {
495                        old_values.push(value);
496                        new_values.push(ChangesetValue::Undefined);
497                    } else {
498                        old_values.push(ChangesetValue::Undefined);
499                        new_values.push(value);
500                    }
501                }
502                (old_values, new_values)
503            }
504        };
505
506        Some((
507            Self {
508                op,
509                old_values,
510                new_values,
511            },
512            offset - pos,
513        ))
514    }
515
516    /// Invert this change: INSERT becomes DELETE, DELETE becomes INSERT,
517    /// UPDATE swaps old and new values.
518    #[must_use]
519    pub fn invert(&self) -> Self {
520        match self.op {
521            ChangeOp::Insert => Self {
522                op: ChangeOp::Delete,
523                old_values: self.new_values.clone(),
524                new_values: Vec::new(),
525            },
526            ChangeOp::Delete => Self {
527                op: ChangeOp::Insert,
528                old_values: Vec::new(),
529                new_values: self.old_values.clone(),
530            },
531            ChangeOp::Update => Self {
532                op: ChangeOp::Update,
533                old_values: self.new_values.clone(),
534                new_values: self.old_values.clone(),
535            },
536        }
537    }
538}
539
540// ---------------------------------------------------------------------------
541// Per-table changeset section
542// ---------------------------------------------------------------------------
543
544/// All row changes for a single table within a changeset.
545#[derive(Debug, Clone, PartialEq)]
546pub struct TableChangeset {
547    pub info: TableInfo,
548    pub rows: Vec<ChangesetRow>,
549}
550
551impl TableChangeset {
552    /// Encode this table section in changeset format.
553    pub fn encode_changeset(&self, out: &mut Vec<u8>) {
554        self.info.encode(out);
555        for row in &self.rows {
556            row.encode_changeset(out);
557        }
558    }
559
560    /// Encode this table section in patchset format.
561    pub fn encode_patchset(&self, out: &mut Vec<u8>) {
562        self.info.encode_patchset(out);
563        for row in &self.rows {
564            row.encode_patchset(out, &self.info.pk_flags);
565        }
566    }
567}
568
569// ---------------------------------------------------------------------------
570// Full changeset
571// ---------------------------------------------------------------------------
572
573/// A complete changeset covering one or more tables.
574#[derive(Debug, Clone, PartialEq)]
575pub struct Changeset {
576    pub tables: Vec<TableChangeset>,
577}
578
579impl Changeset {
580    /// Create an empty changeset.
581    #[must_use]
582    pub fn new() -> Self {
583        Self { tables: Vec::new() }
584    }
585
586    /// Encode the entire changeset in binary format.
587    #[must_use]
588    pub fn encode(&self) -> Vec<u8> {
589        let mut out = Vec::new();
590        for tc in &self.tables {
591            tc.encode_changeset(&mut out);
592        }
593        out
594    }
595
596    /// Encode the entire changeset as a patchset (compact form).
597    #[must_use]
598    pub fn encode_patchset(&self) -> Vec<u8> {
599        let mut out = Vec::new();
600        for tc in &self.tables {
601            tc.encode_patchset(&mut out);
602        }
603        out
604    }
605
606    /// Decode a changeset from its binary representation.
607    pub fn decode(data: &[u8]) -> Option<Self> {
608        let mut tables = Vec::new();
609        let mut pos = 0;
610        while pos < data.len() {
611            let (info, consumed) = TableInfo::decode(data, pos)?;
612            pos += consumed;
613            let mut rows = Vec::new();
614            // Read rows until we hit another table header or end of data.
615            while pos < data.len() && data[pos] != CHANGESET_TABLE_HEADER_BYTE {
616                let (row, consumed) = ChangesetRow::decode_changeset(data, pos, info.column_count)?;
617                pos += consumed;
618                rows.push(row);
619            }
620            tables.push(TableChangeset { info, rows });
621        }
622        Some(Self { tables })
623    }
624
625    /// Decode a patchset from its binary representation.
626    pub fn decode_patchset(data: &[u8]) -> Option<Self> {
627        let mut tables = Vec::new();
628        let mut pos = 0;
629        while pos < data.len() {
630            let (info, consumed) = TableInfo::decode_patchset(data, pos)?;
631            pos += consumed;
632            let mut rows = Vec::new();
633            while pos < data.len() && data[pos] != PATCHSET_TABLE_HEADER_BYTE {
634                let (row, consumed) =
635                    ChangesetRow::decode_patchset(data, pos, info.column_count, &info.pk_flags)?;
636                pos += consumed;
637                rows.push(row);
638            }
639            tables.push(TableChangeset { info, rows });
640        }
641        Some(Self { tables })
642    }
643
644    /// Invert the changeset: every INSERT becomes DELETE, every DELETE
645    /// becomes INSERT, every UPDATE swaps old and new values.
646    #[must_use]
647    pub fn invert(&self) -> Self {
648        Self {
649            tables: self
650                .tables
651                .iter()
652                .map(|tc| TableChangeset {
653                    info: tc.info.clone(),
654                    rows: tc.rows.iter().map(ChangesetRow::invert).collect(),
655                })
656                .collect(),
657        }
658    }
659
660    /// Concatenate another changeset onto this one.
661    pub fn concat(&mut self, other: &Self) {
662        for tc in &other.tables {
663            self.tables.push(tc.clone());
664        }
665    }
666}
667
668impl Default for Changeset {
669    fn default() -> Self {
670        Self::new()
671    }
672}
673
674// ---------------------------------------------------------------------------
675// Session — change tracker
676// ---------------------------------------------------------------------------
677
678/// A recorded change entry tracked by a [`Session`].
679#[derive(Debug, Clone)]
680struct TrackedChange {
681    table_name: String,
682    op: ChangeOp,
683    old_values: Vec<ChangesetValue>,
684    new_values: Vec<ChangesetValue>,
685}
686
687/// Metadata about a table being tracked by a [`Session`].
688#[derive(Debug, Clone)]
689struct TrackedTable {
690    name: String,
691    column_count: usize,
692    pk_flags: Vec<bool>,
693}
694
695fn assert_tracked_change_width(change: &TrackedChange, tracked: &TrackedTable) {
696    match change.op {
697        ChangeOp::Insert => assert_eq!(
698            change.new_values.len(),
699            tracked.column_count,
700            "insert values length must match attached table column_count"
701        ),
702        ChangeOp::Delete => assert_eq!(
703            change.old_values.len(),
704            tracked.column_count,
705            "delete values length must match attached table column_count"
706        ),
707        ChangeOp::Update => {
708            assert_eq!(
709                change.old_values.len(),
710                tracked.column_count,
711                "update old_values length must match attached table column_count"
712            );
713            assert_eq!(
714                change.new_values.len(),
715                tracked.column_count,
716                "update new_values length must match attached table column_count"
717            );
718        }
719    }
720}
721
722fn has_primary_key(pk_flags: &[bool]) -> bool {
723    pk_flags.iter().any(|is_pk| *is_pk)
724}
725
726fn primary_key_values_are_trackable(values: &[ChangesetValue], pk_flags: &[bool]) -> bool {
727    pk_flags
728        .iter()
729        .enumerate()
730        .filter(|(_, is_pk)| **is_pk)
731        .all(|(index, _)| {
732            values.get(index).is_some_and(|value| {
733                !matches!(value, ChangesetValue::Null | ChangesetValue::Undefined)
734            })
735        })
736}
737
738fn materialize_sparse_update(
739    base_row: &[ChangesetValue],
740    sparse_values: &[ChangesetValue],
741) -> Vec<ChangesetValue> {
742    base_row
743        .iter()
744        .zip(sparse_values.iter())
745        .map(|(base, delta)| {
746            if *delta == ChangesetValue::Undefined {
747                base.clone()
748            } else {
749                delta.clone()
750            }
751        })
752        .collect()
753}
754
755fn canonical_old_values(
756    old_row: &[ChangesetValue],
757    new_row: &[ChangesetValue],
758    pk_flags: &[bool],
759) -> Vec<ChangesetValue> {
760    old_row
761        .iter()
762        .zip(new_row.iter())
763        .enumerate()
764        .map(|(index, (old, new))| {
765            if pk_flags.get(index).copied().unwrap_or(false) || old != new {
766                old.clone()
767            } else {
768                ChangesetValue::Undefined
769            }
770        })
771        .collect()
772}
773
774fn canonical_new_values(
775    old_row: &[ChangesetValue],
776    new_row: &[ChangesetValue],
777) -> Vec<ChangesetValue> {
778    old_row
779        .iter()
780        .zip(new_row.iter())
781        .map(|(old, new)| {
782            if old == new {
783                ChangesetValue::Undefined
784            } else {
785                new.clone()
786            }
787        })
788        .collect()
789}
790
791fn primary_key_matches(
792    left: &[ChangesetValue],
793    right: &[ChangesetValue],
794    pk_flags: &[bool],
795) -> bool {
796    pk_flags
797        .iter()
798        .enumerate()
799        .filter(|&(_, &is_pk)| is_pk)
800        .all(|(index, _)| {
801            left.get(index)
802                .zip(right.get(index))
803                .is_some_and(|(lhs, rhs)| lhs == rhs)
804        })
805}
806
807fn primary_key_changed(
808    old_row: &[ChangesetValue],
809    new_row: &[ChangesetValue],
810    pk_flags: &[bool],
811) -> bool {
812    !primary_key_matches(old_row, new_row, pk_flags)
813}
814
815#[derive(Debug, Clone)]
816struct PendingRowChange {
817    before: Option<Vec<ChangesetValue>>,
818    after: Option<Vec<ChangesetValue>>,
819}
820
821impl PendingRowChange {
822    fn from_tracked(change: &TrackedChange, column_count: usize) -> Self {
823        match change.op {
824            ChangeOp::Insert => Self {
825                before: None,
826                after: Some(change.new_values.clone()),
827            },
828            ChangeOp::Delete => Self {
829                before: Some(change.old_values.clone()),
830                after: None,
831            },
832            ChangeOp::Update => {
833                debug_assert_eq!(change.old_values.len(), column_count);
834                debug_assert_eq!(change.new_values.len(), column_count);
835                Self {
836                    before: Some(change.old_values.clone()),
837                    after: Some(materialize_sparse_update(
838                        &change.old_values,
839                        &change.new_values,
840                    )),
841                }
842            }
843        }
844    }
845
846    fn matches_change(&self, change: &TrackedChange, pk_flags: &[bool]) -> bool {
847        match change.op {
848            ChangeOp::Insert => {
849                self.before.as_ref().zip(self.after.as_ref()).is_none()
850                    && self.before.as_ref().is_some_and(|before| {
851                        primary_key_matches(before, &change.new_values, pk_flags)
852                    })
853            }
854            ChangeOp::Delete | ChangeOp::Update => self
855                .after
856                .as_ref()
857                .is_some_and(|after| primary_key_matches(after, &change.old_values, pk_flags)),
858        }
859    }
860
861    fn merge(&mut self, change: &TrackedChange, column_count: usize) {
862        match change.op {
863            ChangeOp::Insert => {
864                self.after = Some(change.new_values.clone());
865            }
866            ChangeOp::Delete => {
867                self.after = None;
868            }
869            ChangeOp::Update => {
870                debug_assert_eq!(change.old_values.len(), column_count);
871                debug_assert_eq!(change.new_values.len(), column_count);
872                if let Some(current_row) = self.after.as_ref() {
873                    self.after = Some(materialize_sparse_update(current_row, &change.new_values));
874                }
875            }
876        }
877    }
878
879    fn is_no_op(&self) -> bool {
880        matches!((&self.before, &self.after), (None, None))
881            || self
882                .before
883                .as_ref()
884                .zip(self.after.as_ref())
885                .is_some_and(|(before, after)| before == after)
886    }
887
888    fn into_changeset_rows(self, pk_flags: &[bool]) -> Vec<ChangesetRow> {
889        match (self.before, self.after) {
890            (None, None) => Vec::new(),
891            (None, Some(new_row)) => vec![ChangesetRow {
892                op: ChangeOp::Insert,
893                old_values: Vec::new(),
894                new_values: new_row,
895            }],
896            (Some(old_row), None) => vec![ChangesetRow {
897                op: ChangeOp::Delete,
898                old_values: old_row,
899                new_values: Vec::new(),
900            }],
901            (Some(old_row), Some(new_row)) => {
902                if old_row == new_row {
903                    Vec::new()
904                } else if primary_key_changed(&old_row, &new_row, pk_flags) {
905                    vec![
906                        ChangesetRow {
907                            op: ChangeOp::Delete,
908                            old_values: old_row,
909                            new_values: Vec::new(),
910                        },
911                        ChangesetRow {
912                            op: ChangeOp::Insert,
913                            old_values: Vec::new(),
914                            new_values: new_row,
915                        },
916                    ]
917                } else {
918                    vec![ChangesetRow {
919                        op: ChangeOp::Update,
920                        old_values: canonical_old_values(&old_row, &new_row, pk_flags),
921                        new_values: canonical_new_values(&old_row, &new_row),
922                    }]
923                }
924            }
925        }
926    }
927}
928
929/// A session that records database changes for later extraction as a
930/// changeset or patchset.
931///
932/// In a real database engine this would hook into the DML pipeline. For now
933/// it provides a programmatic API for recording changes and generating the
934/// binary changeset/patchset encoding.
935#[derive(Debug)]
936pub struct Session {
937    tables: Vec<TrackedTable>,
938    changes: Vec<TrackedChange>,
939}
940
941impl Session {
942    fn tracked_table(&self, table: &str) -> Option<&TrackedTable> {
943        self.tables.iter().find(|tracked| tracked.name == table)
944    }
945
946    fn validate_attached_row_width(&self, table: &str, values: &[ChangesetValue], kind: &str) {
947        if let Some(tracked) = self.tracked_table(table) {
948            assert_eq!(
949                values.len(),
950                tracked.column_count,
951                "attached table row width mismatch for {kind}: table `{table}` expects {} columns but got {}",
952                tracked.column_count,
953                values.len()
954            );
955        }
956    }
957
958    fn validate_attached_update_width(
959        &self,
960        table: &str,
961        old_values: &[ChangesetValue],
962        new_values: &[ChangesetValue],
963    ) {
964        if let Some(tracked) = self.tracked_table(table) {
965            assert_eq!(
966                old_values.len(),
967                tracked.column_count,
968                "attached table row width mismatch for update old values: table `{table}` expects {} columns but got {}",
969                tracked.column_count,
970                old_values.len()
971            );
972            assert_eq!(
973                new_values.len(),
974                tracked.column_count,
975                "attached table row width mismatch for update new values: table `{table}` expects {} columns but got {}",
976                tracked.column_count,
977                new_values.len()
978            );
979        }
980    }
981
982    /// Create a new, empty session.
983    #[must_use]
984    pub fn new() -> Self {
985        Self {
986            tables: Vec::new(),
987            changes: Vec::new(),
988        }
989    }
990
991    /// Attach a table for change tracking.
992    ///
993    /// `pk_flags` indicates which columns are part of the primary key.
994    ///
995    /// SQLite session changesets only track tables with an explicit primary
996    /// key. Attached tables with no key columns are ignored when generating
997    /// changesets or patchsets.
998    pub fn attach_table(&mut self, name: &str, column_count: usize, pk_flags: Vec<bool>) {
999        assert_eq!(
1000            pk_flags.len(),
1001            column_count,
1002            "pk_flags length must match column_count"
1003        );
1004        self.tables.push(TrackedTable {
1005            name: name.to_owned(),
1006            column_count,
1007            pk_flags,
1008        });
1009    }
1010
1011    /// Record an INSERT operation.
1012    pub fn record_insert(&mut self, table: &str, new_values: Vec<ChangesetValue>) {
1013        self.validate_attached_row_width(table, &new_values, "insert");
1014        self.changes.push(TrackedChange {
1015            table_name: table.to_owned(),
1016            op: ChangeOp::Insert,
1017            old_values: Vec::new(),
1018            new_values,
1019        });
1020    }
1021
1022    /// Record a DELETE operation.
1023    pub fn record_delete(&mut self, table: &str, old_values: Vec<ChangesetValue>) {
1024        self.validate_attached_row_width(table, &old_values, "delete");
1025        self.changes.push(TrackedChange {
1026            table_name: table.to_owned(),
1027            op: ChangeOp::Delete,
1028            old_values,
1029            new_values: Vec::new(),
1030        });
1031    }
1032
1033    /// Record an UPDATE operation.
1034    ///
1035    /// `old_values` and `new_values` must have the same length. Use
1036    /// [`ChangesetValue::Undefined`] for columns that did not change.
1037    pub fn record_update(
1038        &mut self,
1039        table: &str,
1040        old_values: Vec<ChangesetValue>,
1041        new_values: Vec<ChangesetValue>,
1042    ) {
1043        self.validate_attached_update_width(table, &old_values, &new_values);
1044        self.changes.push(TrackedChange {
1045            table_name: table.to_owned(),
1046            op: ChangeOp::Update,
1047            old_values,
1048            new_values,
1049        });
1050    }
1051
1052    /// Generate a [`Changeset`] from all recorded changes.
1053    #[must_use]
1054    pub fn changeset(&self) -> Changeset {
1055        self.build_changeset_impl()
1056    }
1057
1058    /// Generate a patchset (compact binary format).
1059    #[must_use]
1060    pub fn patchset(&self) -> Vec<u8> {
1061        let cs = self.build_changeset_impl();
1062        cs.encode_patchset()
1063    }
1064
1065    /// Internal: collate tracked changes into per-table changeset sections.
1066    fn build_changeset_impl(&self) -> Changeset {
1067        let mut tables = Vec::new();
1068        // Emit tables in the order they were attached (deterministic).
1069        for tracked in &self.tables {
1070            if !has_primary_key(&tracked.pk_flags) {
1071                continue;
1072            }
1073
1074            let mut pending = Vec::<PendingRowChange>::new();
1075            for change in self
1076                .changes
1077                .iter()
1078                .filter(|change| change.table_name == tracked.name)
1079            {
1080                assert_tracked_change_width(change, tracked);
1081                let key_source = match change.op {
1082                    ChangeOp::Insert => &change.new_values,
1083                    ChangeOp::Delete | ChangeOp::Update => &change.old_values,
1084                };
1085                if !primary_key_values_are_trackable(key_source, &tracked.pk_flags) {
1086                    continue;
1087                }
1088                if let Some(index) = pending
1089                    .iter()
1090                    .position(|existing| existing.matches_change(change, &tracked.pk_flags))
1091                {
1092                    pending[index].merge(change, tracked.column_count);
1093                    if pending[index].is_no_op() {
1094                        pending.remove(index);
1095                    }
1096                } else {
1097                    pending.push(PendingRowChange::from_tracked(change, tracked.column_count));
1098                }
1099            }
1100
1101            let rows = pending
1102                .into_iter()
1103                .flat_map(|change| change.into_changeset_rows(&tracked.pk_flags))
1104                .collect::<Vec<_>>();
1105            if !rows.is_empty() {
1106                tables.push(TableChangeset {
1107                    info: TableInfo {
1108                        name: tracked.name.clone(),
1109                        column_count: tracked.column_count,
1110                        pk_flags: tracked.pk_flags.clone(),
1111                    },
1112                    rows,
1113                });
1114            }
1115        }
1116        // Changes for unattached tables, or attached tables without an
1117        // explicit primary key, are intentionally dropped to match SQLite
1118        // session semantics.
1119        Changeset { tables }
1120    }
1121}
1122
1123impl Default for Session {
1124    fn default() -> Self {
1125        Self::new()
1126    }
1127}
1128
1129// ---------------------------------------------------------------------------
1130// Changeset application
1131// ---------------------------------------------------------------------------
1132
1133/// Outcome of applying a changeset to a target dataset.
1134#[derive(Debug, Clone, PartialEq, Eq)]
1135pub enum ApplyOutcome {
1136    /// All changes were applied (some may have been skipped via `OmitChange`).
1137    Success { applied: usize, skipped: usize },
1138    /// The apply was aborted by the conflict handler.
1139    Aborted { applied: usize },
1140}
1141
1142/// A simple in-memory "database" for testing changeset application.
1143///
1144/// Maps `table_name -> Vec<row>` where each row is `Vec<SqliteValue>`.
1145/// This is intentionally minimal; the real apply engine would operate on
1146/// the B-tree layer.
1147#[derive(Debug, Clone, Default)]
1148pub struct SimpleTarget {
1149    pub tables: std::collections::HashMap<String, Vec<Vec<SqliteValue>>>,
1150}
1151
1152/// Result of applying a single row change: `Ok(applied)` or `Err(applied)`
1153/// meaning abort with that many previously applied rows.
1154type RowApplyResult = Result<bool, usize>;
1155
1156impl SimpleTarget {
1157    /// Apply a changeset to this target, using `handler` for conflict
1158    /// resolution.
1159    pub fn apply<F>(&mut self, changeset: &Changeset, mut handler: F) -> ApplyOutcome
1160    where
1161        F: FnMut(ConflictType, &ChangesetRow) -> ConflictAction,
1162    {
1163        let original_tables = self.tables.clone();
1164        let mut touched_tables = Vec::new();
1165        let mut applied = 0usize;
1166        let mut skipped = 0usize;
1167
1168        for tc in &changeset.tables {
1169            if !touched_tables
1170                .iter()
1171                .any(|name: &String| name == &tc.info.name)
1172            {
1173                touched_tables.push(tc.info.name.clone());
1174            }
1175            let rows = self.tables.entry(tc.info.name.clone()).or_default();
1176            for change in &tc.rows {
1177                let result = match change.op {
1178                    ChangeOp::Insert => {
1179                        Self::apply_insert(rows, &tc.info.pk_flags, change, &mut handler, applied)
1180                    }
1181                    ChangeOp::Delete => {
1182                        Self::apply_delete(rows, &tc.info.pk_flags, change, &mut handler, applied)
1183                    }
1184                    ChangeOp::Update => {
1185                        Self::apply_update(rows, &tc.info.pk_flags, change, &mut handler, applied)
1186                    }
1187                };
1188                match result {
1189                    Ok(true) => applied += 1,
1190                    Ok(false) => skipped += 1,
1191                    Err(n) => {
1192                        self.tables = original_tables;
1193                        for table_name in touched_tables {
1194                            self.tables.entry(table_name).or_default();
1195                        }
1196                        return ApplyOutcome::Aborted { applied: n };
1197                    }
1198                }
1199            }
1200        }
1201        ApplyOutcome::Success { applied, skipped }
1202    }
1203
1204    fn apply_insert<F>(
1205        rows: &mut Vec<Vec<SqliteValue>>,
1206        pk_flags: &[bool],
1207        change: &ChangesetRow,
1208        handler: &mut F,
1209        applied: usize,
1210    ) -> RowApplyResult
1211    where
1212        F: FnMut(ConflictType, &ChangesetRow) -> ConflictAction,
1213    {
1214        let new_row: Vec<SqliteValue> = change
1215            .new_values
1216            .iter()
1217            .map(ChangesetValue::to_sqlite)
1218            .collect();
1219        if Self::find_row_by_pk(rows, pk_flags, &new_row).is_some() {
1220            match handler(ConflictType::Conflict, change) {
1221                ConflictAction::OmitChange => return Ok(false),
1222                ConflictAction::Replace => {
1223                    let idx =
1224                        Self::find_row_by_pk(rows, pk_flags, &new_row).expect("row just found");
1225                    rows[idx] = new_row;
1226                    return Ok(true);
1227                }
1228                ConflictAction::Abort => return Err(applied),
1229            }
1230        }
1231        rows.push(new_row);
1232        Ok(true)
1233    }
1234
1235    fn apply_delete<F>(
1236        rows: &mut Vec<Vec<SqliteValue>>,
1237        pk_flags: &[bool],
1238        change: &ChangesetRow,
1239        handler: &mut F,
1240        applied: usize,
1241    ) -> RowApplyResult
1242    where
1243        F: FnMut(ConflictType, &ChangesetRow) -> ConflictAction,
1244    {
1245        let pk_target: Vec<SqliteValue> = change
1246            .old_values
1247            .iter()
1248            .map(ChangesetValue::to_sqlite)
1249            .collect();
1250        if let Some(idx) = Self::find_row_by_pk(rows, pk_flags, &pk_target) {
1251            let old_match =
1252                change
1253                    .old_values
1254                    .iter()
1255                    .zip(rows[idx].iter())
1256                    .all(|(cv, sv)| match cv {
1257                        ChangesetValue::Undefined => true,
1258                        _ => cv.to_sqlite() == *sv,
1259                    });
1260            if !old_match {
1261                match handler(ConflictType::Data, change) {
1262                    ConflictAction::OmitChange => return Ok(false),
1263                    ConflictAction::Replace => {
1264                        rows.remove(idx);
1265                        return Ok(true);
1266                    }
1267                    ConflictAction::Abort => return Err(applied),
1268                }
1269            }
1270            rows.remove(idx);
1271            Ok(true)
1272        } else {
1273            match handler(ConflictType::NotFound, change) {
1274                ConflictAction::OmitChange => Ok(false),
1275                ConflictAction::Replace | ConflictAction::Abort => Err(applied),
1276            }
1277        }
1278    }
1279
1280    fn apply_update<F>(
1281        rows: &mut Vec<Vec<SqliteValue>>,
1282        pk_flags: &[bool],
1283        change: &ChangesetRow,
1284        handler: &mut F,
1285        applied: usize,
1286    ) -> RowApplyResult
1287    where
1288        F: FnMut(ConflictType, &ChangesetRow) -> ConflictAction,
1289    {
1290        let old_row: Vec<SqliteValue> = change
1291            .old_values
1292            .iter()
1293            .map(ChangesetValue::to_sqlite)
1294            .collect();
1295        if let Some(idx) = Self::find_row_by_pk(rows, pk_flags, &old_row) {
1296            let old_match =
1297                change
1298                    .old_values
1299                    .iter()
1300                    .zip(rows[idx].iter())
1301                    .all(|(cv, sv)| match cv {
1302                        ChangesetValue::Undefined => true,
1303                        _ => cv.to_sqlite() == *sv,
1304                    });
1305            if !old_match {
1306                match handler(ConflictType::Data, change) {
1307                    ConflictAction::OmitChange => return Ok(false),
1308                    ConflictAction::Replace => {}
1309                    ConflictAction::Abort => return Err(applied),
1310                }
1311            }
1312            let original_row = rows[idx].clone();
1313            let mut updated_row = original_row.clone();
1314            for (i, nv) in change.new_values.iter().enumerate() {
1315                if *nv != ChangesetValue::Undefined
1316                    && let Some(cell) = updated_row.get_mut(i)
1317                {
1318                    *cell = nv.to_sqlite();
1319                }
1320            }
1321
1322            let pk_changed = pk_flags
1323                .iter()
1324                .enumerate()
1325                .filter(|&(_, &is_pk)| is_pk)
1326                .any(|(i, _)| original_row.get(i) != updated_row.get(i));
1327            if pk_changed
1328                && let Some(conflict_idx) =
1329                    rows.iter().enumerate().find_map(|(candidate_idx, row)| {
1330                        (candidate_idx != idx
1331                            && pk_flags
1332                                .iter()
1333                                .enumerate()
1334                                .filter(|&(_, &is_pk)| is_pk)
1335                                .all(|(i, _)| {
1336                                    row.get(i)
1337                                        .zip(updated_row.get(i))
1338                                        .is_some_and(|(a, b)| a == b)
1339                                }))
1340                        .then_some(candidate_idx)
1341                    })
1342            {
1343                match handler(ConflictType::Conflict, change) {
1344                    ConflictAction::OmitChange => return Ok(false),
1345                    ConflictAction::Replace => {
1346                        rows.remove(conflict_idx);
1347                        let target_idx = if conflict_idx < idx { idx - 1 } else { idx };
1348                        rows[target_idx] = updated_row;
1349                        return Ok(true);
1350                    }
1351                    ConflictAction::Abort => return Err(applied),
1352                }
1353            }
1354            rows[idx] = updated_row;
1355            Ok(true)
1356        } else {
1357            match handler(ConflictType::NotFound, change) {
1358                ConflictAction::OmitChange => Ok(false),
1359                ConflictAction::Replace | ConflictAction::Abort => Err(applied),
1360            }
1361        }
1362    }
1363
1364    fn find_row_by_pk(
1365        rows: &[Vec<SqliteValue>],
1366        pk_flags: &[bool],
1367        target: &[SqliteValue],
1368    ) -> Option<usize> {
1369        if !has_primary_key(pk_flags) {
1370            return rows.iter().position(|row| row == target);
1371        }
1372        rows.iter().position(|row| {
1373            pk_flags
1374                .iter()
1375                .enumerate()
1376                .filter(|&(_, &is_pk)| is_pk)
1377                .all(|(i, _)| row.get(i).zip(target.get(i)).is_some_and(|(a, b)| a == b))
1378        })
1379    }
1380}
1381
1382// ---------------------------------------------------------------------------
1383// Varint helpers (re-exported for convenience)
1384// ---------------------------------------------------------------------------
1385
1386/// Compute the byte length of a varint-encoded value.
1387#[must_use]
1388pub const fn changeset_varint_len(value: u64) -> usize {
1389    varint_len(value)
1390}
1391
1392// ---------------------------------------------------------------------------
1393// Tests
1394// ---------------------------------------------------------------------------
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399
1400    #[test]
1401    fn test_extension_name_matches_crate_suffix() {
1402        let expected = env!("CARGO_PKG_NAME")
1403            .strip_prefix("fsqlite-ext-")
1404            .expect("extension crates should use fsqlite-ext-* naming");
1405        assert_eq!(extension_name(), expected);
1406    }
1407
1408    // -----------------------------------------------------------------------
1409    // ChangeOp round-trip
1410    // -----------------------------------------------------------------------
1411
1412    #[test]
1413    fn test_change_op_byte_roundtrip() {
1414        for op in [ChangeOp::Insert, ChangeOp::Delete, ChangeOp::Update] {
1415            assert_eq!(ChangeOp::from_byte(op.as_byte()), Some(op));
1416        }
1417        assert_eq!(ChangeOp::from_byte(0xFF), None);
1418    }
1419
1420    #[test]
1421    fn test_change_op_byte_values() {
1422        assert_eq!(ChangeOp::Insert.as_byte(), 18);
1423        assert_eq!(ChangeOp::Delete.as_byte(), 9);
1424        assert_eq!(ChangeOp::Update.as_byte(), 23);
1425    }
1426
1427    // -----------------------------------------------------------------------
1428    // ChangesetValue encoding / decoding
1429    // -----------------------------------------------------------------------
1430
1431    #[test]
1432    fn test_changeset_value_undefined() {
1433        let mut buf = Vec::new();
1434        ChangesetValue::Undefined.encode(&mut buf);
1435        assert_eq!(buf, [VAL_UNDEFINED]);
1436        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1437        assert_eq!(val, ChangesetValue::Undefined);
1438        assert_eq!(consumed, 1);
1439    }
1440
1441    #[test]
1442    fn test_changeset_value_null() {
1443        let mut buf = Vec::new();
1444        ChangesetValue::Null.encode(&mut buf);
1445        assert_eq!(buf, [VAL_NULL]);
1446        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1447        assert_eq!(val, ChangesetValue::Null);
1448        assert_eq!(consumed, 1);
1449    }
1450
1451    #[test]
1452    fn test_changeset_value_integer() {
1453        let mut buf = Vec::new();
1454        ChangesetValue::Integer(42).encode(&mut buf);
1455        assert_eq!(buf[0], VAL_INTEGER);
1456        assert_eq!(&buf[1..], 42_i64.to_be_bytes());
1457        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1458        assert_eq!(val, ChangesetValue::Integer(42));
1459        assert_eq!(consumed, 9);
1460    }
1461
1462    #[test]
1463    fn test_changeset_value_integer_negative() {
1464        let mut buf = Vec::new();
1465        ChangesetValue::Integer(-12_345).encode(&mut buf);
1466        let (val, _) = ChangesetValue::decode(&buf, 0).unwrap();
1467        assert_eq!(val, ChangesetValue::Integer(-12_345));
1468    }
1469
1470    #[test]
1471    fn test_changeset_value_real() {
1472        let mut buf = Vec::new();
1473        ChangesetValue::Real(1.23).encode(&mut buf);
1474        assert_eq!(buf[0], VAL_REAL);
1475        assert_eq!(&buf[1..], 1.23_f64.to_be_bytes());
1476        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1477        assert_eq!(val, ChangesetValue::Real(1.23));
1478        assert_eq!(consumed, 9);
1479    }
1480
1481    #[test]
1482    fn test_changeset_value_text() {
1483        let mut buf = Vec::new();
1484        ChangesetValue::Text("hello".into()).encode(&mut buf);
1485        assert_eq!(buf[0], VAL_TEXT);
1486        // varint(5) = 0x05, then b"hello"
1487        assert_eq!(buf[1], 5);
1488        assert_eq!(&buf[2..], b"hello");
1489        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1490        assert_eq!(val, ChangesetValue::Text("hello".into()));
1491        assert_eq!(consumed, 7); // 1 type + 1 varint + 5 data
1492    }
1493
1494    #[test]
1495    fn test_changeset_value_text_empty() {
1496        let mut buf = Vec::new();
1497        ChangesetValue::Text(String::new()).encode(&mut buf);
1498        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1499        assert_eq!(val, ChangesetValue::Text(String::new()));
1500        assert_eq!(consumed, 2); // 1 type + 1 varint(0)
1501    }
1502
1503    #[test]
1504    fn test_changeset_value_blob() {
1505        let data = vec![0xDE, 0xAD, 0xBE, 0xEF];
1506        let mut buf = Vec::new();
1507        ChangesetValue::Blob(data.clone()).encode(&mut buf);
1508        assert_eq!(buf[0], VAL_BLOB);
1509        assert_eq!(buf[1], 4); // varint(4)
1510        assert_eq!(&buf[2..], &data);
1511        let (val, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
1512        assert_eq!(val, ChangesetValue::Blob(data));
1513        assert_eq!(consumed, 6);
1514    }
1515
1516    #[test]
1517    fn test_changeset_value_decode_bad_type() {
1518        assert!(ChangesetValue::decode(&[0xFF], 0).is_none());
1519    }
1520
1521    #[test]
1522    fn test_changeset_value_decode_truncated() {
1523        // Integer needs 9 bytes total, give only 5.
1524        assert!(ChangesetValue::decode(&[VAL_INTEGER, 0, 0, 0, 0], 0).is_none());
1525    }
1526
1527    // -----------------------------------------------------------------------
1528    // TableInfo encoding / decoding
1529    // -----------------------------------------------------------------------
1530
1531    #[test]
1532    fn test_table_info_roundtrip() {
1533        let info = TableInfo {
1534            name: "users".to_owned(),
1535            column_count: 3,
1536            pk_flags: vec![true, false, false],
1537        };
1538        let mut buf = Vec::new();
1539        info.encode(&mut buf);
1540
1541        assert_eq!(buf[0], TABLE_HEADER_BYTE);
1542        let (decoded, consumed) = TableInfo::decode(&buf, 0).unwrap();
1543        assert_eq!(decoded, info);
1544        assert_eq!(consumed, buf.len());
1545    }
1546
1547    #[test]
1548    fn test_table_info_header_byte() {
1549        let info = TableInfo {
1550            name: "t".to_owned(),
1551            column_count: 1,
1552            pk_flags: vec![true],
1553        };
1554        let mut buf = Vec::new();
1555        info.encode(&mut buf);
1556        assert_eq!(buf[0], 0x54); // 'T'
1557    }
1558
1559    #[test]
1560    fn test_table_info_nul_terminated_name() {
1561        let info = TableInfo {
1562            name: "orders".to_owned(),
1563            column_count: 2,
1564            pk_flags: vec![true, false],
1565        };
1566        let mut buf = Vec::new();
1567        info.encode(&mut buf);
1568        // Last byte should be NUL terminator.
1569        assert_eq!(*buf.last().unwrap(), 0x00);
1570    }
1571
1572    // -----------------------------------------------------------------------
1573    // Session — basic tracking
1574    // -----------------------------------------------------------------------
1575
1576    #[test]
1577    fn test_session_create() {
1578        let session = Session::new();
1579        assert!(session.tables.is_empty());
1580        assert!(session.changes.is_empty());
1581    }
1582
1583    #[test]
1584    fn test_session_attach_table() {
1585        let mut session = Session::new();
1586        session.attach_table("users", 3, vec![true, false, false]);
1587        assert_eq!(session.tables.len(), 1);
1588        assert_eq!(session.tables[0].name, "users");
1589    }
1590
1591    #[test]
1592    fn test_session_record_insert() {
1593        let mut session = Session::new();
1594        session.attach_table("t", 2, vec![true, false]);
1595        session.record_insert(
1596            "t",
1597            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
1598        );
1599        let cs = session.changeset();
1600        assert_eq!(cs.tables.len(), 1);
1601        assert_eq!(cs.tables[0].rows.len(), 1);
1602        assert_eq!(cs.tables[0].rows[0].op, ChangeOp::Insert);
1603    }
1604
1605    #[test]
1606    fn test_session_record_delete() {
1607        let mut session = Session::new();
1608        session.attach_table("t", 2, vec![true, false]);
1609        session.record_delete(
1610            "t",
1611            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
1612        );
1613        let cs = session.changeset();
1614        assert_eq!(cs.tables[0].rows[0].op, ChangeOp::Delete);
1615    }
1616
1617    #[test]
1618    fn test_session_record_update() {
1619        let mut session = Session::new();
1620        session.attach_table("t", 2, vec![true, false]);
1621        session.record_update(
1622            "t",
1623            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
1624            vec![ChangesetValue::Undefined, ChangesetValue::Text("b".into())],
1625        );
1626        let cs = session.changeset();
1627        let row = &cs.tables[0].rows[0];
1628        assert_eq!(row.op, ChangeOp::Update);
1629        assert_eq!(row.old_values[1], ChangesetValue::Text("a".into()));
1630        assert_eq!(row.new_values[0], ChangesetValue::Undefined);
1631        assert_eq!(row.new_values[1], ChangesetValue::Text("b".into()));
1632    }
1633
1634    #[test]
1635    fn test_session_multiple_tables() {
1636        let mut session = Session::new();
1637        session.attach_table("a", 1, vec![true]);
1638        session.attach_table("b", 1, vec![true]);
1639        session.record_insert("a", vec![ChangesetValue::Integer(1)]);
1640        session.record_insert("b", vec![ChangesetValue::Integer(2)]);
1641        let cs = session.changeset();
1642        assert_eq!(cs.tables.len(), 2);
1643        assert_eq!(cs.tables[0].info.name, "a");
1644        assert_eq!(cs.tables[1].info.name, "b");
1645    }
1646
1647    #[test]
1648    fn test_session_pk_columns() {
1649        let mut session = Session::new();
1650        session.attach_table("t", 3, vec![true, false, true]);
1651        let cs = session.changeset();
1652        // Even with no changes, table metadata is not emitted (no rows).
1653        assert!(cs.tables.is_empty());
1654        // Add a change so the table shows up.
1655        session.record_insert(
1656            "t",
1657            vec![
1658                ChangesetValue::Integer(1),
1659                ChangesetValue::Text("x".into()),
1660                ChangesetValue::Integer(2),
1661            ],
1662        );
1663        let cs = session.changeset();
1664        assert_eq!(cs.tables[0].info.pk_flags, vec![true, false, true]);
1665    }
1666
1667    // -----------------------------------------------------------------------
1668    // Changeset binary format
1669    // -----------------------------------------------------------------------
1670
1671    #[test]
1672    fn test_changeset_binary_format() {
1673        let mut session = Session::new();
1674        session.attach_table("t", 2, vec![true, false]);
1675        session.record_insert(
1676            "t",
1677            vec![
1678                ChangesetValue::Integer(1),
1679                ChangesetValue::Text("hi".into()),
1680            ],
1681        );
1682        let encoded = session.changeset().encode();
1683        // Table header: 'T', varint(2), pk[1,0], "t\0"
1684        assert_eq!(encoded[0], 0x54);
1685        // Verify we can decode it back.
1686        let decoded = Changeset::decode(&encoded).unwrap();
1687        assert_eq!(decoded.tables.len(), 1);
1688        assert_eq!(decoded.tables[0].info.name, "t");
1689        assert_eq!(decoded.tables[0].rows[0].op, ChangeOp::Insert);
1690    }
1691
1692    #[test]
1693    fn test_session_changeset_coalesces_net_row_effects() {
1694        let mut session = Session::new();
1695        session.attach_table("accounts", 3, vec![true, false, false]);
1696        session.record_insert(
1697            "accounts",
1698            vec![
1699                ChangesetValue::Integer(1),
1700                ChangesetValue::Text("alice".into()),
1701                ChangesetValue::Integer(100),
1702            ],
1703        );
1704        session.record_insert(
1705            "accounts",
1706            vec![
1707                ChangesetValue::Integer(2),
1708                ChangesetValue::Text("bob".into()),
1709                ChangesetValue::Integer(50),
1710            ],
1711        );
1712        session.record_update(
1713            "accounts",
1714            vec![
1715                ChangesetValue::Integer(2),
1716                ChangesetValue::Text("bob".into()),
1717                ChangesetValue::Integer(50),
1718            ],
1719            vec![
1720                ChangesetValue::Undefined,
1721                ChangesetValue::Undefined,
1722                ChangesetValue::Integer(75),
1723            ],
1724        );
1725        session.record_delete(
1726            "accounts",
1727            vec![
1728                ChangesetValue::Integer(1),
1729                ChangesetValue::Text("alice".into()),
1730                ChangesetValue::Integer(100),
1731            ],
1732        );
1733
1734        let changeset = session.changeset();
1735        assert_eq!(changeset.tables.len(), 1);
1736        assert_eq!(changeset.tables[0].rows.len(), 1);
1737        assert_eq!(changeset.tables[0].rows[0].op, ChangeOp::Insert);
1738        assert_eq!(
1739            changeset.tables[0].rows[0].new_values,
1740            vec![
1741                ChangesetValue::Integer(2),
1742                ChangesetValue::Text("bob".into()),
1743                ChangesetValue::Integer(75),
1744            ]
1745        );
1746    }
1747
1748    #[test]
1749    fn test_session_changeset_tracks_follow_on_changes_after_pk_update() {
1750        let mut session = Session::new();
1751        session.attach_table("accounts", 3, vec![true, false, false]);
1752        session.record_insert(
1753            "accounts",
1754            vec![
1755                ChangesetValue::Integer(1),
1756                ChangesetValue::Text("alice".into()),
1757                ChangesetValue::Integer(100),
1758            ],
1759        );
1760        session.record_update(
1761            "accounts",
1762            vec![
1763                ChangesetValue::Integer(1),
1764                ChangesetValue::Text("alice".into()),
1765                ChangesetValue::Integer(100),
1766            ],
1767            vec![
1768                ChangesetValue::Integer(2),
1769                ChangesetValue::Undefined,
1770                ChangesetValue::Undefined,
1771            ],
1772        );
1773        session.record_update(
1774            "accounts",
1775            vec![
1776                ChangesetValue::Integer(2),
1777                ChangesetValue::Text("alice".into()),
1778                ChangesetValue::Integer(100),
1779            ],
1780            vec![
1781                ChangesetValue::Undefined,
1782                ChangesetValue::Text("ally".into()),
1783                ChangesetValue::Undefined,
1784            ],
1785        );
1786
1787        let changeset = session.changeset();
1788        assert_eq!(changeset.tables.len(), 1);
1789        assert_eq!(changeset.tables[0].rows.len(), 1);
1790        assert_eq!(changeset.tables[0].rows[0].op, ChangeOp::Insert);
1791        assert_eq!(
1792            changeset.tables[0].rows[0].new_values,
1793            vec![
1794                ChangesetValue::Integer(2),
1795                ChangesetValue::Text("ally".into()),
1796                ChangesetValue::Integer(100),
1797            ]
1798        );
1799    }
1800
1801    #[test]
1802    fn test_session_changeset_existing_row_pk_update_emits_delete_then_insert() {
1803        let mut session = Session::new();
1804        session.attach_table("accounts", 3, vec![true, false, false]);
1805        session.record_update(
1806            "accounts",
1807            vec![
1808                ChangesetValue::Integer(1),
1809                ChangesetValue::Text("alice".into()),
1810                ChangesetValue::Integer(100),
1811            ],
1812            vec![
1813                ChangesetValue::Integer(2),
1814                ChangesetValue::Text("ally".into()),
1815                ChangesetValue::Undefined,
1816            ],
1817        );
1818
1819        let changeset = session.changeset();
1820        assert_eq!(changeset.tables.len(), 1);
1821        assert_eq!(changeset.tables[0].rows.len(), 2);
1822        assert_eq!(changeset.tables[0].rows[0].op, ChangeOp::Delete);
1823        assert_eq!(
1824            changeset.tables[0].rows[0].old_values,
1825            vec![
1826                ChangesetValue::Integer(1),
1827                ChangesetValue::Text("alice".into()),
1828                ChangesetValue::Integer(100),
1829            ]
1830        );
1831        assert_eq!(changeset.tables[0].rows[1].op, ChangeOp::Insert);
1832        assert_eq!(
1833            changeset.tables[0].rows[1].new_values,
1834            vec![
1835                ChangesetValue::Integer(2),
1836                ChangesetValue::Text("ally".into()),
1837                ChangesetValue::Integer(100),
1838            ]
1839        );
1840    }
1841
1842    #[test]
1843    fn test_session_changeset_existing_row_pk_update_then_delete_emits_delete_only() {
1844        let mut session = Session::new();
1845        session.attach_table("accounts", 3, vec![true, false, false]);
1846        session.record_update(
1847            "accounts",
1848            vec![
1849                ChangesetValue::Integer(1),
1850                ChangesetValue::Text("alice".into()),
1851                ChangesetValue::Integer(100),
1852            ],
1853            vec![
1854                ChangesetValue::Integer(2),
1855                ChangesetValue::Text("ally".into()),
1856                ChangesetValue::Undefined,
1857            ],
1858        );
1859        session.record_delete(
1860            "accounts",
1861            vec![
1862                ChangesetValue::Integer(2),
1863                ChangesetValue::Text("ally".into()),
1864                ChangesetValue::Integer(100),
1865            ],
1866        );
1867
1868        let changeset = session.changeset();
1869        assert_eq!(changeset.tables.len(), 1);
1870        assert_eq!(changeset.tables[0].rows.len(), 1);
1871        assert_eq!(changeset.tables[0].rows[0].op, ChangeOp::Delete);
1872        assert_eq!(
1873            changeset.tables[0].rows[0].old_values,
1874            vec![
1875                ChangesetValue::Integer(1),
1876                ChangesetValue::Text("alice".into()),
1877                ChangesetValue::Integer(100),
1878            ]
1879        );
1880    }
1881
1882    #[test]
1883    #[should_panic(expected = "attached table row width mismatch for update new values")]
1884    fn test_session_record_update_rejects_attached_table_width_mismatch() {
1885        let mut session = Session::new();
1886        session.attach_table("accounts", 3, vec![true, false, false]);
1887        session.record_update(
1888            "accounts",
1889            vec![
1890                ChangesetValue::Integer(1),
1891                ChangesetValue::Text("alice".into()),
1892                ChangesetValue::Integer(100),
1893            ],
1894            vec![
1895                ChangesetValue::Undefined,
1896                ChangesetValue::Text("ally".into()),
1897            ],
1898        );
1899    }
1900
1901    #[test]
1902    #[should_panic(expected = "attached table row width mismatch for insert")]
1903    fn test_session_record_insert_rejects_attached_table_width_mismatch() {
1904        let mut session = Session::new();
1905        session.attach_table("accounts", 3, vec![true, false, false]);
1906        session.record_insert(
1907            "accounts",
1908            vec![
1909                ChangesetValue::Integer(1),
1910                ChangesetValue::Text("alice".into()),
1911            ],
1912        );
1913    }
1914
1915    #[test]
1916    fn test_session_pk_update_emits_delete_and_insert() {
1917        let mut session = Session::new();
1918        session.attach_table("accounts", 2, vec![true, false]);
1919        session.record_update(
1920            "accounts",
1921            vec![
1922                ChangesetValue::Integer(1),
1923                ChangesetValue::Text("alice".into()),
1924            ],
1925            vec![
1926                ChangesetValue::Integer(2),
1927                ChangesetValue::Text("alicia".into()),
1928            ],
1929        );
1930
1931        let changeset = session.changeset();
1932        assert_eq!(changeset.tables.len(), 1);
1933        assert_eq!(changeset.tables[0].rows.len(), 2);
1934        assert_eq!(changeset.tables[0].rows[0].op, ChangeOp::Delete);
1935        assert_eq!(
1936            changeset.tables[0].rows[0].old_values,
1937            vec![
1938                ChangesetValue::Integer(1),
1939                ChangesetValue::Text("alice".into()),
1940            ]
1941        );
1942        assert_eq!(changeset.tables[0].rows[1].op, ChangeOp::Insert);
1943        assert_eq!(
1944            changeset.tables[0].rows[1].new_values,
1945            vec![
1946                ChangesetValue::Integer(2),
1947                ChangesetValue::Text("alicia".into()),
1948            ]
1949        );
1950    }
1951
1952    #[test]
1953    fn test_session_pk_evolution_coalesces_to_net_effect() {
1954        let mut session = Session::new();
1955        session.attach_table("accounts", 2, vec![true, false]);
1956        session.record_insert(
1957            "accounts",
1958            vec![
1959                ChangesetValue::Integer(1),
1960                ChangesetValue::Text("alice".into()),
1961            ],
1962        );
1963        session.record_update(
1964            "accounts",
1965            vec![
1966                ChangesetValue::Integer(1),
1967                ChangesetValue::Text("alice".into()),
1968            ],
1969            vec![
1970                ChangesetValue::Integer(2),
1971                ChangesetValue::Text("alicia".into()),
1972            ],
1973        );
1974        session.record_delete(
1975            "accounts",
1976            vec![
1977                ChangesetValue::Integer(2),
1978                ChangesetValue::Text("alicia".into()),
1979            ],
1980        );
1981
1982        let changeset = session.changeset();
1983        assert!(changeset.tables.is_empty());
1984    }
1985
1986    #[test]
1987    #[should_panic(expected = "insert values length must match attached table column_count")]
1988    fn test_session_changeset_rejects_malformed_insert_attached_after_recording() {
1989        let mut session = Session::new();
1990        session.record_insert("accounts", vec![ChangesetValue::Integer(1)]);
1991        session.attach_table("accounts", 2, vec![true, false]);
1992
1993        let _ = session.changeset();
1994    }
1995
1996    #[test]
1997    fn test_changeset_roundtrip() {
1998        let mut session = Session::new();
1999        session.attach_table("users", 3, vec![true, false, false]);
2000        session.record_insert(
2001            "users",
2002            vec![
2003                ChangesetValue::Integer(1),
2004                ChangesetValue::Text("Alice".into()),
2005                ChangesetValue::Integer(30),
2006            ],
2007        );
2008        session.record_insert(
2009            "users",
2010            vec![
2011                ChangesetValue::Integer(2),
2012                ChangesetValue::Text("Bob".into()),
2013                ChangesetValue::Integer(25),
2014            ],
2015        );
2016        session.record_delete(
2017            "users",
2018            vec![
2019                ChangesetValue::Integer(1),
2020                ChangesetValue::Text("Alice".into()),
2021                ChangesetValue::Integer(30),
2022            ],
2023        );
2024        session.record_update(
2025            "users",
2026            vec![
2027                ChangesetValue::Integer(2),
2028                ChangesetValue::Text("Bob".into()),
2029                ChangesetValue::Integer(25),
2030            ],
2031            vec![
2032                ChangesetValue::Undefined,
2033                ChangesetValue::Text("Robert".into()),
2034                ChangesetValue::Undefined,
2035            ],
2036        );
2037
2038        let cs = session.changeset();
2039        let encoded = cs.encode();
2040        let decoded = Changeset::decode(&encoded).unwrap();
2041        assert_eq!(decoded, cs);
2042    }
2043
2044    // -----------------------------------------------------------------------
2045    // Changeset inversion
2046    // -----------------------------------------------------------------------
2047
2048    #[test]
2049    fn test_changeset_invert_insert() {
2050        let row = ChangesetRow {
2051            op: ChangeOp::Insert,
2052            old_values: Vec::new(),
2053            new_values: vec![ChangesetValue::Integer(1)],
2054        };
2055        let inv = row.invert();
2056        assert_eq!(inv.op, ChangeOp::Delete);
2057        assert_eq!(inv.old_values, vec![ChangesetValue::Integer(1)]);
2058        assert!(inv.new_values.is_empty());
2059    }
2060
2061    #[test]
2062    fn test_changeset_invert_delete() {
2063        let row = ChangesetRow {
2064            op: ChangeOp::Delete,
2065            old_values: vec![ChangesetValue::Integer(1)],
2066            new_values: Vec::new(),
2067        };
2068        let inv = row.invert();
2069        assert_eq!(inv.op, ChangeOp::Insert);
2070        assert!(inv.old_values.is_empty());
2071        assert_eq!(inv.new_values, vec![ChangesetValue::Integer(1)]);
2072    }
2073
2074    #[test]
2075    fn test_changeset_invert_update() {
2076        let row = ChangesetRow {
2077            op: ChangeOp::Update,
2078            old_values: vec![
2079                ChangesetValue::Integer(1),
2080                ChangesetValue::Text("old".into()),
2081            ],
2082            new_values: vec![
2083                ChangesetValue::Undefined,
2084                ChangesetValue::Text("new".into()),
2085            ],
2086        };
2087        let inv = row.invert();
2088        assert_eq!(inv.op, ChangeOp::Update);
2089        assert_eq!(inv.old_values[0], ChangesetValue::Undefined);
2090        assert_eq!(inv.old_values[1], ChangesetValue::Text("new".into()));
2091        assert_eq!(inv.new_values[0], ChangesetValue::Integer(1));
2092        assert_eq!(inv.new_values[1], ChangesetValue::Text("old".into()));
2093    }
2094
2095    // -----------------------------------------------------------------------
2096    // Changeset concat
2097    // -----------------------------------------------------------------------
2098
2099    #[test]
2100    fn test_changeset_concat() {
2101        let mut cs1 = Changeset::new();
2102        cs1.tables.push(TableChangeset {
2103            info: TableInfo {
2104                name: "a".to_owned(),
2105                column_count: 1,
2106                pk_flags: vec![true],
2107            },
2108            rows: vec![ChangesetRow {
2109                op: ChangeOp::Insert,
2110                old_values: Vec::new(),
2111                new_values: vec![ChangesetValue::Integer(1)],
2112            }],
2113        });
2114        let cs2 = Changeset {
2115            tables: vec![TableChangeset {
2116                info: TableInfo {
2117                    name: "b".to_owned(),
2118                    column_count: 1,
2119                    pk_flags: vec![true],
2120                },
2121                rows: vec![ChangesetRow {
2122                    op: ChangeOp::Insert,
2123                    old_values: Vec::new(),
2124                    new_values: vec![ChangesetValue::Integer(2)],
2125                }],
2126            }],
2127        };
2128        cs1.concat(&cs2);
2129        assert_eq!(cs1.tables.len(), 2);
2130    }
2131
2132    // -----------------------------------------------------------------------
2133    // Patchset format
2134    // -----------------------------------------------------------------------
2135
2136    #[test]
2137    fn test_patchset_format_omits_old_values() {
2138        let mut session = Session::new();
2139        session.attach_table("t", 3, vec![true, false, false]);
2140        session.record_update(
2141            "t",
2142            vec![
2143                ChangesetValue::Integer(1),
2144                ChangesetValue::Text("old_name".into()),
2145                ChangesetValue::Integer(100),
2146            ],
2147            vec![
2148                ChangesetValue::Undefined,
2149                ChangesetValue::Text("new_name".into()),
2150                ChangesetValue::Undefined,
2151            ],
2152        );
2153        let changeset_bytes = session.changeset().encode();
2154        let patchset_bytes = session.patchset();
2155        // Patchset should be smaller (omits non-PK old values).
2156        assert!(
2157            patchset_bytes.len() < changeset_bytes.len(),
2158            "patchset ({}) should be smaller than changeset ({})",
2159            patchset_bytes.len(),
2160            changeset_bytes.len(),
2161        );
2162    }
2163
2164    #[test]
2165    fn test_patchset_update_uses_single_record_layout() {
2166        let pk_flags = vec![true, false, false];
2167        let row = ChangesetRow {
2168            op: ChangeOp::Update,
2169            old_values: vec![
2170                ChangesetValue::Integer(1),
2171                ChangesetValue::Text("old_name".into()),
2172                ChangesetValue::Integer(100),
2173            ],
2174            new_values: vec![
2175                ChangesetValue::Undefined,
2176                ChangesetValue::Text("new_name".into()),
2177                ChangesetValue::Undefined,
2178            ],
2179        };
2180
2181        let mut patchset = Vec::new();
2182        row.encode_patchset(&mut patchset, &pk_flags);
2183
2184        let mut expected = vec![OP_UPDATE, 0x00];
2185        ChangesetValue::Integer(1).encode(&mut expected);
2186        ChangesetValue::Text("new_name".into()).encode(&mut expected);
2187        ChangesetValue::Undefined.encode(&mut expected);
2188        assert_eq!(patchset, expected);
2189    }
2190
2191    #[test]
2192    fn test_patchset_insert_same_as_changeset() {
2193        let mut session = Session::new();
2194        session.attach_table("t", 2, vec![true, false]);
2195        session.record_insert(
2196            "t",
2197            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
2198        );
2199        let changeset_bytes = session.changeset().encode();
2200        let patchset_bytes = session.patchset();
2201        assert_eq!(changeset_bytes[1..], patchset_bytes[1..]);
2202        assert_eq!(changeset_bytes[0], CHANGESET_TABLE_HEADER_BYTE);
2203        assert_eq!(patchset_bytes[0], PATCHSET_TABLE_HEADER_BYTE);
2204    }
2205
2206    #[test]
2207    fn test_patchset_decode_rehydrates_pk_old_values() {
2208        let mut session = Session::new();
2209        session.attach_table("t", 3, vec![true, false, false]);
2210        session.record_update(
2211            "t",
2212            vec![
2213                ChangesetValue::Integer(1),
2214                ChangesetValue::Text("old_name".into()),
2215                ChangesetValue::Integer(100),
2216            ],
2217            vec![
2218                ChangesetValue::Undefined,
2219                ChangesetValue::Text("new_name".into()),
2220                ChangesetValue::Undefined,
2221            ],
2222        );
2223
2224        let patchset_bytes = session.patchset();
2225        let decoded = Changeset::decode_patchset(&patchset_bytes).unwrap();
2226        let row = &decoded.tables[0].rows[0];
2227
2228        assert_eq!(decoded.encode_patchset(), patchset_bytes);
2229        assert_eq!(
2230            row.old_values,
2231            vec![
2232                ChangesetValue::Integer(1),
2233                ChangesetValue::Undefined,
2234                ChangesetValue::Undefined,
2235            ]
2236        );
2237        assert_eq!(
2238            row.new_values,
2239            vec![
2240                ChangesetValue::Undefined,
2241                ChangesetValue::Text("new_name".into()),
2242                ChangesetValue::Undefined,
2243            ]
2244        );
2245    }
2246
2247    #[test]
2248    fn test_patchset_decode_truncated_update_returns_none() {
2249        let mut session = Session::new();
2250        session.attach_table("t", 3, vec![true, false, false]);
2251        session.record_update(
2252            "t",
2253            vec![
2254                ChangesetValue::Integer(1),
2255                ChangesetValue::Text("old_name".into()),
2256                ChangesetValue::Integer(100),
2257            ],
2258            vec![
2259                ChangesetValue::Undefined,
2260                ChangesetValue::Text("new_name".into()),
2261                ChangesetValue::Undefined,
2262            ],
2263        );
2264
2265        let mut patchset_bytes = session.patchset();
2266        patchset_bytes.pop();
2267        assert!(Changeset::decode_patchset(&patchset_bytes).is_none());
2268    }
2269
2270    #[test]
2271    fn test_patchset_apply_matches_changeset_apply() {
2272        let mut session = Session::new();
2273        session.attach_table("accounts", 3, vec![true, false, false]);
2274        session.record_insert(
2275            "accounts",
2276            vec![
2277                ChangesetValue::Integer(1),
2278                ChangesetValue::Text("alice".into()),
2279                ChangesetValue::Integer(100),
2280            ],
2281        );
2282        session.record_insert(
2283            "accounts",
2284            vec![
2285                ChangesetValue::Integer(2),
2286                ChangesetValue::Text("bob".into()),
2287                ChangesetValue::Integer(50),
2288            ],
2289        );
2290        session.record_update(
2291            "accounts",
2292            vec![
2293                ChangesetValue::Integer(2),
2294                ChangesetValue::Text("bob".into()),
2295                ChangesetValue::Integer(50),
2296            ],
2297            vec![
2298                ChangesetValue::Undefined,
2299                ChangesetValue::Undefined,
2300                ChangesetValue::Integer(75),
2301            ],
2302        );
2303        session.record_delete(
2304            "accounts",
2305            vec![
2306                ChangesetValue::Integer(1),
2307                ChangesetValue::Text("alice".into()),
2308                ChangesetValue::Integer(100),
2309            ],
2310        );
2311
2312        let changeset = session.changeset();
2313        let decoded_patchset = Changeset::decode_patchset(&session.patchset()).unwrap();
2314
2315        let mut changeset_target = SimpleTarget::default();
2316        let mut patchset_target = SimpleTarget::default();
2317        let changeset_outcome = changeset_target.apply(&changeset, |_, _| ConflictAction::Abort);
2318        let patchset_outcome =
2319            patchset_target.apply(&decoded_patchset, |_, _| ConflictAction::Abort);
2320
2321        assert_eq!(
2322            changeset_outcome,
2323            ApplyOutcome::Success {
2324                applied: 1,
2325                skipped: 0,
2326            }
2327        );
2328        assert_eq!(patchset_outcome, changeset_outcome);
2329        assert_eq!(patchset_target.tables, changeset_target.tables);
2330        assert_eq!(
2331            changeset_target.tables["accounts"],
2332            vec![vec![
2333                SqliteValue::Integer(2),
2334                SqliteValue::Text("bob".into()),
2335                SqliteValue::Integer(75),
2336            ]]
2337        );
2338    }
2339
2340    // -----------------------------------------------------------------------
2341    // Apply — successful cases
2342    // -----------------------------------------------------------------------
2343
2344    #[test]
2345    fn test_apply_insert() {
2346        let cs = Changeset {
2347            tables: vec![TableChangeset {
2348                info: TableInfo {
2349                    name: "t".to_owned(),
2350                    column_count: 2,
2351                    pk_flags: vec![true, false],
2352                },
2353                rows: vec![ChangesetRow {
2354                    op: ChangeOp::Insert,
2355                    old_values: Vec::new(),
2356                    new_values: vec![
2357                        ChangesetValue::Integer(1),
2358                        ChangesetValue::Text("hello".into()),
2359                    ],
2360                }],
2361            }],
2362        };
2363
2364        let mut target = SimpleTarget::default();
2365        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
2366        assert_eq!(
2367            outcome,
2368            ApplyOutcome::Success {
2369                applied: 1,
2370                skipped: 0
2371            }
2372        );
2373        assert_eq!(
2374            target.tables["t"],
2375            vec![vec![
2376                SqliteValue::Integer(1),
2377                SqliteValue::Text("hello".into())
2378            ]]
2379        );
2380    }
2381
2382    #[test]
2383    fn test_apply_delete() {
2384        let mut target = SimpleTarget::default();
2385        target.tables.insert(
2386            "t".to_owned(),
2387            vec![vec![
2388                SqliteValue::Integer(1),
2389                SqliteValue::Text("hello".into()),
2390            ]],
2391        );
2392
2393        let cs = Changeset {
2394            tables: vec![TableChangeset {
2395                info: TableInfo {
2396                    name: "t".to_owned(),
2397                    column_count: 2,
2398                    pk_flags: vec![true, false],
2399                },
2400                rows: vec![ChangesetRow {
2401                    op: ChangeOp::Delete,
2402                    old_values: vec![
2403                        ChangesetValue::Integer(1),
2404                        ChangesetValue::Text("hello".into()),
2405                    ],
2406                    new_values: Vec::new(),
2407                }],
2408            }],
2409        };
2410
2411        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
2412        assert_eq!(
2413            outcome,
2414            ApplyOutcome::Success {
2415                applied: 1,
2416                skipped: 0
2417            }
2418        );
2419        assert!(target.tables["t"].is_empty());
2420    }
2421
2422    #[test]
2423    fn test_apply_update() {
2424        let mut target = SimpleTarget::default();
2425        target.tables.insert(
2426            "t".to_owned(),
2427            vec![vec![
2428                SqliteValue::Integer(1),
2429                SqliteValue::Text("old".into()),
2430            ]],
2431        );
2432
2433        let cs = Changeset {
2434            tables: vec![TableChangeset {
2435                info: TableInfo {
2436                    name: "t".to_owned(),
2437                    column_count: 2,
2438                    pk_flags: vec![true, false],
2439                },
2440                rows: vec![ChangesetRow {
2441                    op: ChangeOp::Update,
2442                    old_values: vec![
2443                        ChangesetValue::Integer(1),
2444                        ChangesetValue::Text("old".into()),
2445                    ],
2446                    new_values: vec![
2447                        ChangesetValue::Undefined,
2448                        ChangesetValue::Text("new".into()),
2449                    ],
2450                }],
2451            }],
2452        };
2453
2454        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
2455        assert_eq!(
2456            outcome,
2457            ApplyOutcome::Success {
2458                applied: 1,
2459                skipped: 0
2460            }
2461        );
2462        assert_eq!(
2463            target.tables["t"][0],
2464            vec![SqliteValue::Integer(1), SqliteValue::Text("new".into())]
2465        );
2466    }
2467
2468    // -----------------------------------------------------------------------
2469    // Apply — conflict scenarios
2470    // -----------------------------------------------------------------------
2471
2472    #[test]
2473    fn test_conflict_not_found() {
2474        let cs = Changeset {
2475            tables: vec![TableChangeset {
2476                info: TableInfo {
2477                    name: "t".to_owned(),
2478                    column_count: 1,
2479                    pk_flags: vec![true],
2480                },
2481                rows: vec![ChangesetRow {
2482                    op: ChangeOp::Delete,
2483                    old_values: vec![ChangesetValue::Integer(999)],
2484                    new_values: Vec::new(),
2485                }],
2486            }],
2487        };
2488        let mut target = SimpleTarget::default();
2489        let mut conflict_seen = None;
2490        let outcome = target.apply(&cs, |ct, _| {
2491            conflict_seen = Some(ct);
2492            ConflictAction::OmitChange
2493        });
2494        assert_eq!(conflict_seen, Some(ConflictType::NotFound));
2495        assert_eq!(
2496            outcome,
2497            ApplyOutcome::Success {
2498                applied: 0,
2499                skipped: 1
2500            }
2501        );
2502    }
2503
2504    #[test]
2505    fn test_conflict_data() {
2506        let mut target = SimpleTarget::default();
2507        target.tables.insert(
2508            "t".to_owned(),
2509            vec![vec![
2510                SqliteValue::Integer(1),
2511                SqliteValue::Text("actual".into()),
2512            ]],
2513        );
2514
2515        let cs = Changeset {
2516            tables: vec![TableChangeset {
2517                info: TableInfo {
2518                    name: "t".to_owned(),
2519                    column_count: 2,
2520                    pk_flags: vec![true, false],
2521                },
2522                rows: vec![ChangesetRow {
2523                    op: ChangeOp::Delete,
2524                    old_values: vec![
2525                        ChangesetValue::Integer(1),
2526                        ChangesetValue::Text("expected".into()),
2527                    ],
2528                    new_values: Vec::new(),
2529                }],
2530            }],
2531        };
2532
2533        let mut conflict_seen = None;
2534        let outcome = target.apply(&cs, |ct, _| {
2535            conflict_seen = Some(ct);
2536            ConflictAction::OmitChange
2537        });
2538        assert_eq!(conflict_seen, Some(ConflictType::Data));
2539        assert_eq!(
2540            outcome,
2541            ApplyOutcome::Success {
2542                applied: 0,
2543                skipped: 1
2544            }
2545        );
2546    }
2547
2548    #[test]
2549    fn test_conflict_unique_insert() {
2550        let mut target = SimpleTarget::default();
2551        target
2552            .tables
2553            .insert("t".to_owned(), vec![vec![SqliteValue::Integer(1)]]);
2554
2555        let cs = Changeset {
2556            tables: vec![TableChangeset {
2557                info: TableInfo {
2558                    name: "t".to_owned(),
2559                    column_count: 1,
2560                    pk_flags: vec![true],
2561                },
2562                rows: vec![ChangesetRow {
2563                    op: ChangeOp::Insert,
2564                    old_values: Vec::new(),
2565                    new_values: vec![ChangesetValue::Integer(1)], // Duplicate PK
2566                }],
2567            }],
2568        };
2569
2570        let mut conflict_seen = None;
2571        let outcome = target.apply(&cs, |ct, _| {
2572            conflict_seen = Some(ct);
2573            ConflictAction::OmitChange
2574        });
2575        assert_eq!(conflict_seen, Some(ConflictType::Conflict));
2576        assert_eq!(
2577            outcome,
2578            ApplyOutcome::Success {
2579                applied: 0,
2580                skipped: 1
2581            }
2582        );
2583    }
2584
2585    #[test]
2586    fn test_conflict_omit_skips() {
2587        let mut target = SimpleTarget::default();
2588        let cs = Changeset {
2589            tables: vec![TableChangeset {
2590                info: TableInfo {
2591                    name: "t".to_owned(),
2592                    column_count: 1,
2593                    pk_flags: vec![true],
2594                },
2595                rows: vec![ChangesetRow {
2596                    op: ChangeOp::Delete,
2597                    old_values: vec![ChangesetValue::Integer(1)],
2598                    new_values: Vec::new(),
2599                }],
2600            }],
2601        };
2602        let outcome = target.apply(&cs, |_, _| ConflictAction::OmitChange);
2603        assert_eq!(
2604            outcome,
2605            ApplyOutcome::Success {
2606                applied: 0,
2607                skipped: 1
2608            }
2609        );
2610    }
2611
2612    #[test]
2613    fn test_conflict_replace_insert() {
2614        let mut target = SimpleTarget::default();
2615        target.tables.insert(
2616            "t".to_owned(),
2617            vec![vec![
2618                SqliteValue::Integer(1),
2619                SqliteValue::Text("old".into()),
2620            ]],
2621        );
2622
2623        let cs = Changeset {
2624            tables: vec![TableChangeset {
2625                info: TableInfo {
2626                    name: "t".to_owned(),
2627                    column_count: 2,
2628                    pk_flags: vec![true, false],
2629                },
2630                rows: vec![ChangesetRow {
2631                    op: ChangeOp::Insert,
2632                    old_values: Vec::new(),
2633                    new_values: vec![
2634                        ChangesetValue::Integer(1),
2635                        ChangesetValue::Text("replaced".into()),
2636                    ],
2637                }],
2638            }],
2639        };
2640
2641        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
2642        assert_eq!(
2643            outcome,
2644            ApplyOutcome::Success {
2645                applied: 1,
2646                skipped: 0
2647            }
2648        );
2649        assert_eq!(
2650            target.tables["t"][0],
2651            vec![
2652                SqliteValue::Integer(1),
2653                SqliteValue::Text("replaced".into())
2654            ]
2655        );
2656    }
2657
2658    #[test]
2659    fn test_conflict_abort_stops_apply() {
2660        let mut target = SimpleTarget::default();
2661        let cs = Changeset {
2662            tables: vec![TableChangeset {
2663                info: TableInfo {
2664                    name: "t".to_owned(),
2665                    column_count: 1,
2666                    pk_flags: vec![true],
2667                },
2668                rows: vec![
2669                    ChangesetRow {
2670                        op: ChangeOp::Delete,
2671                        old_values: vec![ChangesetValue::Integer(1)],
2672                        new_values: Vec::new(),
2673                    },
2674                    ChangesetRow {
2675                        op: ChangeOp::Insert,
2676                        old_values: Vec::new(),
2677                        new_values: vec![ChangesetValue::Integer(2)],
2678                    },
2679                ],
2680            }],
2681        };
2682        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
2683        assert_eq!(outcome, ApplyOutcome::Aborted { applied: 0 });
2684        // Second row should NOT have been applied.
2685        assert!(!target.tables.contains_key("t") || target.tables["t"].is_empty());
2686    }
2687
2688    // -----------------------------------------------------------------------
2689    // Full round-trip: session → changeset → apply → verify
2690    // -----------------------------------------------------------------------
2691
2692    #[test]
2693    fn test_changeset_full_roundtrip() {
2694        // Build changeset via session.
2695        let mut session = Session::new();
2696        session.attach_table("users", 3, vec![true, false, false]);
2697        session.record_insert(
2698            "users",
2699            vec![
2700                ChangesetValue::Integer(1),
2701                ChangesetValue::Text("Alice".into()),
2702                ChangesetValue::Integer(30),
2703            ],
2704        );
2705        session.record_insert(
2706            "users",
2707            vec![
2708                ChangesetValue::Integer(2),
2709                ChangesetValue::Text("Bob".into()),
2710                ChangesetValue::Integer(25),
2711            ],
2712        );
2713
2714        let cs = session.changeset();
2715
2716        // Apply to empty target.
2717        let mut target = SimpleTarget::default();
2718        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
2719        assert_eq!(
2720            outcome,
2721            ApplyOutcome::Success {
2722                applied: 2,
2723                skipped: 0
2724            }
2725        );
2726        assert_eq!(target.tables["users"].len(), 2);
2727        assert_eq!(
2728            target.tables["users"][0][1],
2729            SqliteValue::Text("Alice".into())
2730        );
2731        assert_eq!(
2732            target.tables["users"][1][1],
2733            SqliteValue::Text("Bob".into())
2734        );
2735    }
2736
2737    #[test]
2738    fn test_changeset_invert_undoes_changes() {
2739        let mut session = Session::new();
2740        session.attach_table("t", 2, vec![true, false]);
2741        session.record_insert(
2742            "t",
2743            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
2744        );
2745
2746        let cs = session.changeset();
2747        let inv = cs.invert();
2748
2749        // Apply original changeset.
2750        let mut target = SimpleTarget::default();
2751        target.apply(&cs, |_, _| ConflictAction::Abort);
2752        assert_eq!(target.tables["t"].len(), 1);
2753
2754        // Apply inverted changeset — should remove the row.
2755        target.apply(&inv, |_, _| ConflictAction::Abort);
2756        assert!(target.tables["t"].is_empty());
2757    }
2758
2759    // -----------------------------------------------------------------------
2760    // ChangesetValue <-> SqliteValue conversion
2761    // -----------------------------------------------------------------------
2762
2763    #[test]
2764    fn test_changeset_value_from_sqlite() {
2765        assert_eq!(
2766            ChangesetValue::from_sqlite(&SqliteValue::Null),
2767            ChangesetValue::Null
2768        );
2769        assert_eq!(
2770            ChangesetValue::from_sqlite(&SqliteValue::Integer(42)),
2771            ChangesetValue::Integer(42)
2772        );
2773        assert_eq!(
2774            ChangesetValue::from_sqlite(&SqliteValue::Float(1.5)),
2775            ChangesetValue::Real(1.5)
2776        );
2777        assert_eq!(
2778            ChangesetValue::from_sqlite(&SqliteValue::Text("x".into())),
2779            ChangesetValue::Text("x".into())
2780        );
2781        assert_eq!(
2782            ChangesetValue::from_sqlite(&SqliteValue::Blob(vec![1, 2].into())),
2783            ChangesetValue::Blob(vec![1, 2])
2784        );
2785    }
2786
2787    #[test]
2788    fn test_changeset_value_to_sqlite() {
2789        assert_eq!(ChangesetValue::Undefined.to_sqlite(), SqliteValue::Null);
2790        assert_eq!(ChangesetValue::Null.to_sqlite(), SqliteValue::Null);
2791        assert_eq!(
2792            ChangesetValue::Integer(7).to_sqlite(),
2793            SqliteValue::Integer(7)
2794        );
2795        assert_eq!(
2796            ChangesetValue::Real(2.5).to_sqlite(),
2797            SqliteValue::Float(2.5)
2798        );
2799        assert_eq!(
2800            ChangesetValue::Text("hi".into()).to_sqlite(),
2801            SqliteValue::Text("hi".into())
2802        );
2803        assert_eq!(
2804            ChangesetValue::Blob(vec![0xAB]).to_sqlite(),
2805            SqliteValue::Blob(vec![0xAB].into())
2806        );
2807    }
2808
2809    // -----------------------------------------------------------------------
2810    // ChangeOp edge cases
2811    // -----------------------------------------------------------------------
2812
2813    #[test]
2814    fn test_change_op_from_byte_exhaustive_invalid() {
2815        for b in 0..=255u8 {
2816            if matches!(b, 0x12 | 0x09 | 0x17) {
2817                assert!(ChangeOp::from_byte(b).is_some());
2818            } else {
2819                assert!(
2820                    ChangeOp::from_byte(b).is_none(),
2821                    "byte {b:#x} should be None"
2822                );
2823            }
2824        }
2825    }
2826
2827    #[test]
2828    fn test_change_op_copy_clone_eq() {
2829        let a = ChangeOp::Insert;
2830        let b = a;
2831        assert_eq!(a, b);
2832        assert_ne!(ChangeOp::Insert, ChangeOp::Delete);
2833        assert_ne!(ChangeOp::Delete, ChangeOp::Update);
2834    }
2835
2836    #[test]
2837    fn test_change_op_debug() {
2838        let s = format!("{:?}", ChangeOp::Insert);
2839        assert_eq!(s, "Insert");
2840    }
2841
2842    // -----------------------------------------------------------------------
2843    // ChangesetValue edge cases
2844    // -----------------------------------------------------------------------
2845
2846    #[test]
2847    fn test_changeset_value_integer_boundaries() {
2848        for &val in &[i64::MIN, i64::MAX, 0, -1, 1] {
2849            let mut buf = Vec::new();
2850            ChangesetValue::Integer(val).encode(&mut buf);
2851            let (decoded, _) = ChangesetValue::decode(&buf, 0).unwrap();
2852            assert_eq!(decoded, ChangesetValue::Integer(val));
2853        }
2854    }
2855
2856    #[test]
2857    fn test_changeset_value_real_special() {
2858        for &val in &[
2859            0.0,
2860            -0.0,
2861            f64::MAX,
2862            f64::MIN,
2863            f64::MIN_POSITIVE,
2864            f64::EPSILON,
2865        ] {
2866            let mut buf = Vec::new();
2867            ChangesetValue::Real(val).encode(&mut buf);
2868            let (decoded, _) = ChangesetValue::decode(&buf, 0).unwrap();
2869            assert_eq!(decoded, ChangesetValue::Real(val));
2870        }
2871    }
2872
2873    #[test]
2874    fn test_changeset_value_real_nan_roundtrip() {
2875        let mut buf = Vec::new();
2876        ChangesetValue::Real(f64::NAN).encode(&mut buf);
2877        let (decoded, _) = ChangesetValue::decode(&buf, 0).unwrap();
2878        if let ChangesetValue::Real(f) = decoded {
2879            assert!(f.is_nan());
2880        } else {
2881            panic!("expected Real");
2882        }
2883    }
2884
2885    #[test]
2886    fn test_changeset_value_blob_empty() {
2887        let mut buf = Vec::new();
2888        ChangesetValue::Blob(Vec::new()).encode(&mut buf);
2889        let (decoded, consumed) = ChangesetValue::decode(&buf, 0).unwrap();
2890        assert_eq!(decoded, ChangesetValue::Blob(Vec::new()));
2891        assert_eq!(consumed, 2); // type + varint(0)
2892    }
2893
2894    #[test]
2895    fn test_changeset_value_text_unicode() {
2896        let text = "\u{1F600}\u{1F4A9}\u{2603}"; // emoji + snowman
2897        let mut buf = Vec::new();
2898        ChangesetValue::Text(text.into()).encode(&mut buf);
2899        let (decoded, _) = ChangesetValue::decode(&buf, 0).unwrap();
2900        assert_eq!(decoded, ChangesetValue::Text(text.into()));
2901    }
2902
2903    #[test]
2904    fn test_changeset_value_decode_at_offset() {
2905        let mut buf = Vec::new();
2906        ChangesetValue::Null.encode(&mut buf); // 1 byte
2907        ChangesetValue::Integer(42).encode(&mut buf); // 9 bytes
2908        let (val, consumed) = ChangesetValue::decode(&buf, 1).unwrap();
2909        assert_eq!(val, ChangesetValue::Integer(42));
2910        assert_eq!(consumed, 9);
2911    }
2912
2913    #[test]
2914    fn test_changeset_value_decode_empty_slice() {
2915        assert!(ChangesetValue::decode(&[], 0).is_none());
2916    }
2917
2918    #[test]
2919    fn test_changeset_value_decode_offset_beyond_len() {
2920        assert!(ChangesetValue::decode(&[VAL_NULL], 5).is_none());
2921    }
2922
2923    #[test]
2924    fn test_changeset_value_decode_truncated_real() {
2925        assert!(ChangesetValue::decode(&[VAL_REAL, 0, 0, 0], 0).is_none());
2926    }
2927
2928    #[test]
2929    fn test_changeset_value_decode_truncated_text() {
2930        // Type byte + varint(10) but only 3 content bytes
2931        let mut buf = vec![VAL_TEXT, 10, b'a', b'b', b'c'];
2932        assert!(ChangesetValue::decode(&buf, 0).is_none());
2933        // Fix: provide exactly 10 bytes
2934        buf.extend_from_slice(&[0; 7]);
2935        // Non-UTF8 bytes should fail
2936        buf[5] = 0xFF;
2937        assert!(ChangesetValue::decode(&buf, 0).is_none());
2938    }
2939
2940    #[test]
2941    fn test_changeset_value_decode_truncated_blob() {
2942        let buf = vec![VAL_BLOB, 5, 1, 2]; // says 5 bytes, only has 2
2943        assert!(ChangesetValue::decode(&buf, 0).is_none());
2944    }
2945
2946    // -----------------------------------------------------------------------
2947    // ChangesetValue <-> SqliteValue round-trip
2948    // -----------------------------------------------------------------------
2949
2950    #[test]
2951    #[allow(clippy::approx_constant)]
2952    fn test_changeset_value_sqlite_roundtrip_all_types() {
2953        let values = vec![
2954            SqliteValue::Null,
2955            SqliteValue::Integer(0),
2956            SqliteValue::Integer(i64::MAX),
2957            SqliteValue::Float(3.14),
2958            SqliteValue::Text("".into()),
2959            SqliteValue::Text("test".into()),
2960            SqliteValue::Blob(vec![].into()),
2961            SqliteValue::Blob(vec![1, 2, 3].into()),
2962        ];
2963        for sv in &values {
2964            let cv = ChangesetValue::from_sqlite(sv);
2965            let back = cv.to_sqlite();
2966            assert_eq!(&back, sv);
2967        }
2968    }
2969
2970    // -----------------------------------------------------------------------
2971    // TableInfo edge cases
2972    // -----------------------------------------------------------------------
2973
2974    #[test]
2975    fn test_table_info_single_column() {
2976        let info = TableInfo {
2977            name: "x".to_owned(),
2978            column_count: 1,
2979            pk_flags: vec![true],
2980        };
2981        let mut buf = Vec::new();
2982        info.encode(&mut buf);
2983        let (decoded, consumed) = TableInfo::decode(&buf, 0).unwrap();
2984        assert_eq!(decoded, info);
2985        assert_eq!(consumed, buf.len());
2986    }
2987
2988    #[test]
2989    fn test_table_info_no_pk_columns() {
2990        let info = TableInfo {
2991            name: "t".to_owned(),
2992            column_count: 3,
2993            pk_flags: vec![false, false, false],
2994        };
2995        let mut buf = Vec::new();
2996        info.encode(&mut buf);
2997        let (decoded, _) = TableInfo::decode(&buf, 0).unwrap();
2998        assert_eq!(decoded.pk_flags, vec![false, false, false]);
2999    }
3000
3001    #[test]
3002    fn test_table_info_unicode_name() {
3003        let info = TableInfo {
3004            name: "\u{00FC}berschrift".to_owned(),
3005            column_count: 1,
3006            pk_flags: vec![true],
3007        };
3008        let mut buf = Vec::new();
3009        info.encode(&mut buf);
3010        let (decoded, _) = TableInfo::decode(&buf, 0).unwrap();
3011        assert_eq!(decoded.name, "\u{00FC}berschrift");
3012    }
3013
3014    #[test]
3015    fn test_table_info_decode_wrong_header() {
3016        assert!(TableInfo::decode(&[0x00, 0x01, 0x01, b't', 0x00], 0).is_none());
3017    }
3018
3019    #[test]
3020    fn test_table_info_decode_truncated() {
3021        assert!(TableInfo::decode(&[TABLE_HEADER_BYTE], 0).is_none());
3022        assert!(TableInfo::decode(&[TABLE_HEADER_BYTE, 3, 1], 0).is_none());
3023    }
3024
3025    #[test]
3026    fn test_table_info_decode_at_offset() {
3027        let mut buf = vec![0xFF, 0xFF]; // padding
3028        let info = TableInfo {
3029            name: "t".to_owned(),
3030            column_count: 1,
3031            pk_flags: vec![true],
3032        };
3033        info.encode(&mut buf);
3034        let (decoded, _) = TableInfo::decode(&buf, 2).unwrap();
3035        assert_eq!(decoded, info);
3036    }
3037
3038    // -----------------------------------------------------------------------
3039    // ChangesetRow edge cases
3040    // -----------------------------------------------------------------------
3041
3042    #[test]
3043    fn test_changeset_row_invert_double_is_identity() {
3044        let row = ChangesetRow {
3045            op: ChangeOp::Update,
3046            old_values: vec![
3047                ChangesetValue::Integer(1),
3048                ChangesetValue::Text("old".into()),
3049            ],
3050            new_values: vec![
3051                ChangesetValue::Undefined,
3052                ChangesetValue::Text("new".into()),
3053            ],
3054        };
3055        let double_inverted = row.invert().invert();
3056        assert_eq!(double_inverted, row);
3057    }
3058
3059    #[test]
3060    fn test_changeset_row_encode_decode_all_ops() {
3061        let col_count = 2;
3062        for op in [ChangeOp::Insert, ChangeOp::Delete, ChangeOp::Update] {
3063            let row = match op {
3064                ChangeOp::Insert => ChangesetRow {
3065                    op,
3066                    old_values: Vec::new(),
3067                    new_values: vec![ChangesetValue::Integer(1), ChangesetValue::Null],
3068                },
3069                ChangeOp::Delete => ChangesetRow {
3070                    op,
3071                    old_values: vec![ChangesetValue::Integer(1), ChangesetValue::Null],
3072                    new_values: Vec::new(),
3073                },
3074                ChangeOp::Update => ChangesetRow {
3075                    op,
3076                    old_values: vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
3077                    new_values: vec![ChangesetValue::Undefined, ChangesetValue::Text("b".into())],
3078                },
3079            };
3080            let mut buf = Vec::new();
3081            row.encode_changeset(&mut buf);
3082            let (decoded, consumed) = ChangesetRow::decode_changeset(&buf, 0, col_count).unwrap();
3083            assert_eq!(decoded, row);
3084            assert_eq!(consumed, buf.len());
3085        }
3086    }
3087
3088    #[test]
3089    fn test_changeset_row_decode_bad_op() {
3090        assert!(ChangesetRow::decode_changeset(&[0xFF, VAL_NULL], 0, 1).is_none());
3091    }
3092
3093    // -----------------------------------------------------------------------
3094    // Patchset UPDATE: PK-only old values
3095    // -----------------------------------------------------------------------
3096
3097    #[test]
3098    fn test_patchset_update_only_pk_old() {
3099        let pk_flags = vec![true, false, false];
3100        let row = ChangesetRow {
3101            op: ChangeOp::Update,
3102            old_values: vec![
3103                ChangesetValue::Integer(1),
3104                ChangesetValue::Text("old_name".into()),
3105                ChangesetValue::Integer(100),
3106            ],
3107            new_values: vec![
3108                ChangesetValue::Undefined,
3109                ChangesetValue::Text("new_name".into()),
3110                ChangesetValue::Undefined,
3111            ],
3112        };
3113        let mut cs_buf = Vec::new();
3114        row.encode_changeset(&mut cs_buf);
3115        let mut ps_buf = Vec::new();
3116        row.encode_patchset(&mut ps_buf, &pk_flags);
3117        assert!(ps_buf.len() < cs_buf.len());
3118    }
3119
3120    #[test]
3121    fn test_patchset_delete_omits_non_pk_old_values() {
3122        let pk_flags = vec![true, false];
3123        let row = ChangesetRow {
3124            op: ChangeOp::Delete,
3125            old_values: vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
3126            new_values: Vec::new(),
3127        };
3128        let mut cs_buf = Vec::new();
3129        row.encode_changeset(&mut cs_buf);
3130        let mut ps_buf = Vec::new();
3131        row.encode_patchset(&mut ps_buf, &pk_flags);
3132        assert!(ps_buf.len() < cs_buf.len());
3133    }
3134
3135    // -----------------------------------------------------------------------
3136    // Session: explicit PK requirement
3137    // -----------------------------------------------------------------------
3138
3139    #[test]
3140    fn test_session_unattached_table_ignored() {
3141        let mut session = Session::new();
3142        session.record_insert("auto", vec![ChangesetValue::Integer(1)]);
3143        let cs = session.changeset();
3144        assert!(cs.tables.is_empty());
3145    }
3146
3147    #[test]
3148    fn test_session_attached_table_without_pk_is_ignored() {
3149        let mut session = Session::new();
3150        session.attach_table("auto", 2, vec![false, false]);
3151        session.record_insert(
3152            "auto",
3153            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
3154        );
3155        let cs = session.changeset();
3156        assert!(cs.tables.is_empty());
3157    }
3158
3159    #[test]
3160    fn test_session_rows_with_null_primary_key_are_ignored() {
3161        let mut session = Session::new();
3162        session.attach_table("accounts", 2, vec![true, false]);
3163        session.record_insert(
3164            "accounts",
3165            vec![ChangesetValue::Null, ChangesetValue::Text("alice".into())],
3166        );
3167        session.record_update(
3168            "accounts",
3169            vec![ChangesetValue::Null, ChangesetValue::Text("alice".into())],
3170            vec![
3171                ChangesetValue::Undefined,
3172                ChangesetValue::Text("alice_2".into()),
3173            ],
3174        );
3175        session.record_delete(
3176            "accounts",
3177            vec![ChangesetValue::Null, ChangesetValue::Text("alice_2".into())],
3178        );
3179
3180        let changeset = session.changeset();
3181        assert!(changeset.tables.is_empty());
3182    }
3183
3184    #[test]
3185    fn test_session_empty_changeset() {
3186        let session = Session::new();
3187        let cs = session.changeset();
3188        assert!(cs.tables.is_empty());
3189        assert!(cs.encode().is_empty());
3190    }
3191
3192    #[test]
3193    fn test_session_empty_patchset() {
3194        let session = Session::new();
3195        assert!(session.patchset().is_empty());
3196    }
3197
3198    #[test]
3199    fn test_session_default_trait() {
3200        let session = Session::default();
3201        assert!(session.tables.is_empty());
3202    }
3203
3204    // -----------------------------------------------------------------------
3205    // Changeset edge cases
3206    // -----------------------------------------------------------------------
3207
3208    #[test]
3209    fn test_changeset_default_trait() {
3210        let cs = Changeset::default();
3211        assert!(cs.tables.is_empty());
3212    }
3213
3214    #[test]
3215    fn test_changeset_empty_encode_decode() {
3216        let cs = Changeset::new();
3217        let encoded = cs.encode();
3218        assert!(encoded.is_empty());
3219        let decoded = Changeset::decode(&encoded).unwrap();
3220        assert!(decoded.tables.is_empty());
3221    }
3222
3223    #[test]
3224    fn test_changeset_invert_is_self_inverse() {
3225        let mut session = Session::new();
3226        session.attach_table("t", 2, vec![true, false]);
3227        session.record_insert(
3228            "t",
3229            vec![ChangesetValue::Integer(1), ChangesetValue::Text("a".into())],
3230        );
3231        session.record_delete(
3232            "t",
3233            vec![ChangesetValue::Integer(2), ChangesetValue::Text("b".into())],
3234        );
3235        session.record_update(
3236            "t",
3237            vec![ChangesetValue::Integer(3), ChangesetValue::Text("c".into())],
3238            vec![ChangesetValue::Undefined, ChangesetValue::Text("d".into())],
3239        );
3240
3241        let cs = session.changeset();
3242        let double_inv = cs.invert().invert();
3243        assert_eq!(double_inv, cs);
3244    }
3245
3246    #[test]
3247    fn test_changeset_multi_table_encode_decode() {
3248        let mut session = Session::new();
3249        session.attach_table("a", 1, vec![true]);
3250        session.attach_table("b", 2, vec![true, false]);
3251        session.record_insert("a", vec![ChangesetValue::Integer(1)]);
3252        session.record_insert(
3253            "b",
3254            vec![ChangesetValue::Integer(2), ChangesetValue::Text("x".into())],
3255        );
3256        session.record_delete("a", vec![ChangesetValue::Integer(3)]);
3257
3258        let cs = session.changeset();
3259        let encoded = cs.encode();
3260        let decoded = Changeset::decode(&encoded).unwrap();
3261        assert_eq!(decoded, cs);
3262    }
3263
3264    // -----------------------------------------------------------------------
3265    // Apply: additional conflict scenarios
3266    // -----------------------------------------------------------------------
3267
3268    #[test]
3269    fn test_apply_update_data_conflict_replace() {
3270        let mut target = SimpleTarget::default();
3271        target.tables.insert(
3272            "t".to_owned(),
3273            vec![vec![
3274                SqliteValue::Integer(1),
3275                SqliteValue::Text("actual".into()),
3276            ]],
3277        );
3278
3279        let cs = Changeset {
3280            tables: vec![TableChangeset {
3281                info: TableInfo {
3282                    name: "t".to_owned(),
3283                    column_count: 2,
3284                    pk_flags: vec![true, false],
3285                },
3286                rows: vec![ChangesetRow {
3287                    op: ChangeOp::Update,
3288                    old_values: vec![
3289                        ChangesetValue::Integer(1),
3290                        ChangesetValue::Text("expected".into()),
3291                    ],
3292                    new_values: vec![
3293                        ChangesetValue::Undefined,
3294                        ChangesetValue::Text("new".into()),
3295                    ],
3296                }],
3297            }],
3298        };
3299
3300        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
3301        assert_eq!(
3302            outcome,
3303            ApplyOutcome::Success {
3304                applied: 1,
3305                skipped: 0
3306            }
3307        );
3308        assert_eq!(target.tables["t"][0][1], SqliteValue::Text("new".into()));
3309    }
3310
3311    #[test]
3312    fn test_apply_update_pk_conflict_omit() {
3313        let mut target = SimpleTarget::default();
3314        target.tables.insert(
3315            "t".to_owned(),
3316            vec![
3317                vec![SqliteValue::Integer(1), SqliteValue::Text("alice".into())],
3318                vec![SqliteValue::Integer(2), SqliteValue::Text("bob".into())],
3319            ],
3320        );
3321
3322        let cs = Changeset {
3323            tables: vec![TableChangeset {
3324                info: TableInfo {
3325                    name: "t".to_owned(),
3326                    column_count: 2,
3327                    pk_flags: vec![true, false],
3328                },
3329                rows: vec![ChangesetRow {
3330                    op: ChangeOp::Update,
3331                    old_values: vec![
3332                        ChangesetValue::Integer(1),
3333                        ChangesetValue::Text("alice".into()),
3334                    ],
3335                    new_values: vec![
3336                        ChangesetValue::Integer(2),
3337                        ChangesetValue::Text("ally".into()),
3338                    ],
3339                }],
3340            }],
3341        };
3342
3343        let mut conflict_seen = None;
3344        let outcome = target.apply(&cs, |conflict, _| {
3345            conflict_seen = Some(conflict);
3346            ConflictAction::OmitChange
3347        });
3348        assert_eq!(conflict_seen, Some(ConflictType::Conflict));
3349        assert_eq!(
3350            outcome,
3351            ApplyOutcome::Success {
3352                applied: 0,
3353                skipped: 1,
3354            }
3355        );
3356        assert_eq!(
3357            target.tables["t"],
3358            vec![
3359                vec![SqliteValue::Integer(1), SqliteValue::Text("alice".into()),],
3360                vec![SqliteValue::Integer(2), SqliteValue::Text("bob".into()),],
3361            ]
3362        );
3363    }
3364
3365    #[test]
3366    fn test_apply_update_pk_conflict_replace_overwrites_conflicting_row() {
3367        let mut target = SimpleTarget::default();
3368        target.tables.insert(
3369            "t".to_owned(),
3370            vec![
3371                vec![SqliteValue::Integer(1), SqliteValue::Text("alice".into())],
3372                vec![SqliteValue::Integer(2), SqliteValue::Text("bob".into())],
3373            ],
3374        );
3375
3376        let cs = Changeset {
3377            tables: vec![TableChangeset {
3378                info: TableInfo {
3379                    name: "t".to_owned(),
3380                    column_count: 2,
3381                    pk_flags: vec![true, false],
3382                },
3383                rows: vec![ChangesetRow {
3384                    op: ChangeOp::Update,
3385                    old_values: vec![
3386                        ChangesetValue::Integer(1),
3387                        ChangesetValue::Text("alice".into()),
3388                    ],
3389                    new_values: vec![
3390                        ChangesetValue::Integer(2),
3391                        ChangesetValue::Text("ally".into()),
3392                    ],
3393                }],
3394            }],
3395        };
3396
3397        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
3398        assert_eq!(
3399            outcome,
3400            ApplyOutcome::Success {
3401                applied: 1,
3402                skipped: 0,
3403            }
3404        );
3405        assert_eq!(
3406            target.tables["t"],
3407            vec![vec![
3408                SqliteValue::Integer(2),
3409                SqliteValue::Text("ally".into()),
3410            ]]
3411        );
3412    }
3413
3414    #[test]
3415    fn test_apply_delete_data_conflict_replace_removes() {
3416        let mut target = SimpleTarget::default();
3417        target.tables.insert(
3418            "t".to_owned(),
3419            vec![vec![
3420                SqliteValue::Integer(1),
3421                SqliteValue::Text("actual".into()),
3422            ]],
3423        );
3424
3425        let cs = Changeset {
3426            tables: vec![TableChangeset {
3427                info: TableInfo {
3428                    name: "t".to_owned(),
3429                    column_count: 2,
3430                    pk_flags: vec![true, false],
3431                },
3432                rows: vec![ChangesetRow {
3433                    op: ChangeOp::Delete,
3434                    old_values: vec![
3435                        ChangesetValue::Integer(1),
3436                        ChangesetValue::Text("expected".into()),
3437                    ],
3438                    new_values: Vec::new(),
3439                }],
3440            }],
3441        };
3442
3443        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
3444        assert_eq!(
3445            outcome,
3446            ApplyOutcome::Success {
3447                applied: 1,
3448                skipped: 0
3449            }
3450        );
3451        assert!(target.tables["t"].is_empty());
3452    }
3453
3454    #[test]
3455    fn test_apply_update_not_found_abort() {
3456        let mut target = SimpleTarget::default();
3457        let cs = Changeset {
3458            tables: vec![TableChangeset {
3459                info: TableInfo {
3460                    name: "t".to_owned(),
3461                    column_count: 1,
3462                    pk_flags: vec![true],
3463                },
3464                rows: vec![ChangesetRow {
3465                    op: ChangeOp::Update,
3466                    old_values: vec![ChangesetValue::Integer(1)],
3467                    new_values: vec![ChangesetValue::Integer(2)],
3468                }],
3469            }],
3470        };
3471        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
3472        assert_eq!(outcome, ApplyOutcome::Aborted { applied: 0 });
3473    }
3474
3475    #[test]
3476    fn test_apply_delete_not_found_replace_aborts() {
3477        let mut target = SimpleTarget::default();
3478        let cs = Changeset {
3479            tables: vec![TableChangeset {
3480                info: TableInfo {
3481                    name: "t".to_owned(),
3482                    column_count: 1,
3483                    pk_flags: vec![true],
3484                },
3485                rows: vec![ChangesetRow {
3486                    op: ChangeOp::Delete,
3487                    old_values: vec![ChangesetValue::Integer(1)],
3488                    new_values: Vec::new(),
3489                }],
3490            }],
3491        };
3492        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
3493        assert_eq!(outcome, ApplyOutcome::Aborted { applied: 0 });
3494        assert!(target.tables["t"].is_empty());
3495    }
3496
3497    #[test]
3498    fn test_apply_update_not_found_replace_aborts() {
3499        let mut target = SimpleTarget::default();
3500        let cs = Changeset {
3501            tables: vec![TableChangeset {
3502                info: TableInfo {
3503                    name: "t".to_owned(),
3504                    column_count: 1,
3505                    pk_flags: vec![true],
3506                },
3507                rows: vec![ChangesetRow {
3508                    op: ChangeOp::Update,
3509                    old_values: vec![ChangesetValue::Integer(1)],
3510                    new_values: vec![ChangesetValue::Integer(2)],
3511                }],
3512            }],
3513        };
3514        let outcome = target.apply(&cs, |_, _| ConflictAction::Replace);
3515        assert_eq!(outcome, ApplyOutcome::Aborted { applied: 0 });
3516        assert!(target.tables["t"].is_empty());
3517    }
3518
3519    #[test]
3520    fn test_apply_abort_rolls_back_prior_successes() {
3521        let mut target = SimpleTarget::default();
3522        let cs = Changeset {
3523            tables: vec![TableChangeset {
3524                info: TableInfo {
3525                    name: "t".to_owned(),
3526                    column_count: 1,
3527                    pk_flags: vec![true],
3528                },
3529                rows: vec![
3530                    ChangesetRow {
3531                        op: ChangeOp::Insert,
3532                        old_values: Vec::new(),
3533                        new_values: vec![ChangesetValue::Integer(1)],
3534                    },
3535                    ChangesetRow {
3536                        op: ChangeOp::Delete,
3537                        old_values: vec![ChangesetValue::Integer(2)],
3538                        new_values: Vec::new(),
3539                    },
3540                ],
3541            }],
3542        };
3543        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
3544        assert_eq!(outcome, ApplyOutcome::Aborted { applied: 1 });
3545        assert!(target.tables["t"].is_empty());
3546    }
3547
3548    #[test]
3549    fn test_apply_multiple_rows_mixed() {
3550        let mut target = SimpleTarget::default();
3551        let cs = Changeset {
3552            tables: vec![TableChangeset {
3553                info: TableInfo {
3554                    name: "t".to_owned(),
3555                    column_count: 2,
3556                    pk_flags: vec![true, false],
3557                },
3558                rows: vec![
3559                    ChangesetRow {
3560                        op: ChangeOp::Insert,
3561                        old_values: Vec::new(),
3562                        new_values: vec![
3563                            ChangesetValue::Integer(1),
3564                            ChangesetValue::Text("a".into()),
3565                        ],
3566                    },
3567                    ChangesetRow {
3568                        op: ChangeOp::Insert,
3569                        old_values: Vec::new(),
3570                        new_values: vec![
3571                            ChangesetValue::Integer(2),
3572                            ChangesetValue::Text("b".into()),
3573                        ],
3574                    },
3575                ],
3576            }],
3577        };
3578        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
3579        assert_eq!(
3580            outcome,
3581            ApplyOutcome::Success {
3582                applied: 2,
3583                skipped: 0
3584            }
3585        );
3586        assert_eq!(target.tables["t"].len(), 2);
3587    }
3588
3589    #[test]
3590    fn test_apply_insert_without_pk_uses_full_row_identity() {
3591        let mut target = SimpleTarget::default();
3592        let cs = Changeset {
3593            tables: vec![TableChangeset {
3594                info: TableInfo {
3595                    name: "t".to_owned(),
3596                    column_count: 2,
3597                    pk_flags: vec![false, false],
3598                },
3599                rows: vec![
3600                    ChangesetRow {
3601                        op: ChangeOp::Insert,
3602                        old_values: Vec::new(),
3603                        new_values: vec![
3604                            ChangesetValue::Integer(1),
3605                            ChangesetValue::Text("a".into()),
3606                        ],
3607                    },
3608                    ChangesetRow {
3609                        op: ChangeOp::Insert,
3610                        old_values: Vec::new(),
3611                        new_values: vec![
3612                            ChangesetValue::Integer(2),
3613                            ChangesetValue::Text("b".into()),
3614                        ],
3615                    },
3616                    ChangesetRow {
3617                        op: ChangeOp::Insert,
3618                        old_values: Vec::new(),
3619                        new_values: vec![
3620                            ChangesetValue::Integer(1),
3621                            ChangesetValue::Text("a".into()),
3622                        ],
3623                    },
3624                ],
3625            }],
3626        };
3627
3628        let outcome = target.apply(&cs, |_, _| ConflictAction::OmitChange);
3629        assert_eq!(
3630            outcome,
3631            ApplyOutcome::Success {
3632                applied: 2,
3633                skipped: 1
3634            }
3635        );
3636        assert_eq!(target.tables["t"].len(), 2);
3637        assert_eq!(
3638            target.tables["t"],
3639            vec![
3640                vec![SqliteValue::Integer(1), SqliteValue::Text("a".into())],
3641                vec![SqliteValue::Integer(2), SqliteValue::Text("b".into())],
3642            ]
3643        );
3644    }
3645
3646    #[test]
3647    fn test_apply_empty_changeset() {
3648        let mut target = SimpleTarget::default();
3649        let cs = Changeset::new();
3650        let outcome = target.apply(&cs, |_, _| ConflictAction::Abort);
3651        assert_eq!(
3652            outcome,
3653            ApplyOutcome::Success {
3654                applied: 0,
3655                skipped: 0
3656            }
3657        );
3658    }
3659
3660    // -----------------------------------------------------------------------
3661    // TableChangeset encoding
3662    // -----------------------------------------------------------------------
3663
3664    #[test]
3665    fn test_table_changeset_encode_patchset() {
3666        let tc = TableChangeset {
3667            info: TableInfo {
3668                name: "t".to_owned(),
3669                column_count: 2,
3670                pk_flags: vec![true, false],
3671            },
3672            rows: vec![ChangesetRow {
3673                op: ChangeOp::Insert,
3674                old_values: Vec::new(),
3675                new_values: vec![ChangesetValue::Integer(1), ChangesetValue::Null],
3676            }],
3677        };
3678        let mut cs_buf = Vec::new();
3679        tc.encode_changeset(&mut cs_buf);
3680        let mut ps_buf = Vec::new();
3681        tc.encode_patchset(&mut ps_buf);
3682        assert_eq!(cs_buf[0], CHANGESET_TABLE_HEADER_BYTE);
3683        assert_eq!(ps_buf[0], PATCHSET_TABLE_HEADER_BYTE);
3684        assert_eq!(cs_buf[1..], ps_buf[1..]);
3685    }
3686
3687    // -----------------------------------------------------------------------
3688    // changeset_varint_len
3689    // -----------------------------------------------------------------------
3690
3691    #[test]
3692    fn test_changeset_varint_len_values() {
3693        assert_eq!(changeset_varint_len(0), 1);
3694        assert_eq!(changeset_varint_len(127), 1);
3695        assert_eq!(changeset_varint_len(128), 2);
3696        assert!(changeset_varint_len(u64::MAX) > 0);
3697    }
3698
3699    // -----------------------------------------------------------------------
3700    // ConflictType / ConflictAction traits
3701    // -----------------------------------------------------------------------
3702
3703    #[test]
3704    fn test_conflict_type_eq() {
3705        assert_eq!(ConflictType::Data, ConflictType::Data);
3706        assert_ne!(ConflictType::Data, ConflictType::NotFound);
3707        assert_ne!(ConflictType::Conflict, ConflictType::Constraint);
3708        assert_ne!(ConflictType::Constraint, ConflictType::ForeignKey);
3709    }
3710
3711    #[test]
3712    fn test_conflict_action_eq() {
3713        assert_eq!(ConflictAction::OmitChange, ConflictAction::OmitChange);
3714        assert_ne!(ConflictAction::OmitChange, ConflictAction::Replace);
3715        assert_ne!(ConflictAction::Replace, ConflictAction::Abort);
3716    }
3717
3718    #[test]
3719    fn test_conflict_type_debug() {
3720        assert_eq!(format!("{:?}", ConflictType::ForeignKey), "ForeignKey");
3721    }
3722
3723    // -----------------------------------------------------------------------
3724    // Extension name
3725    // -----------------------------------------------------------------------
3726
3727    #[test]
3728    fn test_extension_name_value() {
3729        assert_eq!(extension_name(), "session");
3730    }
3731
3732    // -----------------------------------------------------------------------
3733    // ApplyOutcome
3734    // -----------------------------------------------------------------------
3735
3736    #[test]
3737    fn test_apply_outcome_debug() {
3738        let outcome = ApplyOutcome::Success {
3739            applied: 5,
3740            skipped: 2,
3741        };
3742        let s = format!("{:?}", outcome);
3743        assert!(s.contains('5'));
3744        assert!(s.contains('2'));
3745    }
3746
3747    #[test]
3748    fn test_apply_outcome_aborted_eq() {
3749        assert_eq!(
3750            ApplyOutcome::Aborted { applied: 3 },
3751            ApplyOutcome::Aborted { applied: 3 }
3752        );
3753        assert_ne!(
3754            ApplyOutcome::Aborted { applied: 3 },
3755            ApplyOutcome::Aborted { applied: 4 }
3756        );
3757    }
3758}