Skip to main content

layer_client/
session_backend.rs

1//! Pluggable session storage backend.
2//!
3//! # What changed vs the original
4//!
5//! | Before | After |
6//! |---|---|
7//! | Only `save(PersistedSession)` + `load()`: full round-trip for every change | New `update_dc`, `set_home_dc`, `update_state` allow granular writes |
8//! | All methods sync (`io::Result`) | New methods are `async` (optional; default impls fall back to save/load) |
9//! | No way to update a single DC key without touching everything else | `update_dc` only rewrites what changed |
10//!
11//! # Backward compatibility
12//!
13//! The existing `SessionBackend` trait is unchanged. The new methods have
14//! default implementations that call `load` → mutate → `save`, so existing
15//! backends (`BinaryFileBackend`, `InMemoryBackend`, `SqliteBackend`,
16//! `LibSqlBackend`) continue to compile and work without modification.
17//!
18//! High-performance backends (e.g. a future Redis backend) can override the
19//! granular methods to avoid the load/save round-trip.
20//!
21//! # Ported from
22//!
23//! ' `Session` trait (in `-session/src/session.rs`) exposes:
24//! - `home_dc_id() -> i32`                           : cheap sync read
25//! - `set_home_dc_id(dc_id) -> BoxFuture<'_, ()>`    : async write
26//! - `dc_option(dc_id) -> Option<DcOption>`          : cheap sync read
27//! - `set_dc_option(&DcOption) -> BoxFuture<'_, ()>` : async write
28//! - `updates_state() -> BoxFuture<UpdatesState>`     : async read
29//! - `set_update_state(UpdateState) -> BoxFuture<()>`: fine-grained async write
30//!
31//! We adopt the same pattern while keeping layer's `PersistedSession` struct.
32
33use std::io;
34use std::path::PathBuf;
35
36use crate::session::{CachedPeer, DcEntry, PersistedSession, UpdatesStateSnap};
37
38// Core trait (unchanged)
39
40/// Synchronous snapshot backend: saves and loads the full session at once.
41///
42/// All built-in backends implement this. Higher-level code should prefer the
43/// extension methods below (`update_dc`, `set_home_dc`, `update_state`) which
44/// avoid unnecessary full-snapshot writes.
45pub trait SessionBackend: Send + Sync {
46    fn save(&self, session: &PersistedSession) -> io::Result<()>;
47    fn load(&self) -> io::Result<Option<PersistedSession>>;
48    fn delete(&self) -> io::Result<()>;
49
50    /// Human-readable name for logging/debug output.
51    fn name(&self) -> &str;
52
53    // Granular helpers (default: load → mutate → save)
54    //
55    // These default implementations are correct but not optimal.
56    // Backends that store data in a database (SQLite, libsql, Redis) should
57    // override them to issue single-row UPDATE statements instead.
58
59    /// Update a single DC entry without rewriting the entire session.
60    ///
61    /// Typically called after:
62    /// - completing a DH handshake on a new DC (to persist its auth key)
63    /// - receiving updated DC addresses from `help.getConfig`
64    ///
65    /// Ported from  `Session::set_dc_option`.
66    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
67        let mut s = self.load()?.unwrap_or_default();
68        // Replace existing entry or append
69        if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
70            *existing = entry.clone();
71        } else {
72            s.dcs.push(entry.clone());
73        }
74        self.save(&s)
75    }
76
77    /// Change the home DC without touching any other session data.
78    ///
79    /// Called after a successful `*_MIGRATE` redirect: the user's account
80    /// now lives on a different DC.
81    ///
82    /// Ported from  `Session::set_home_dc_id`.
83    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
84        let mut s = self.load()?.unwrap_or_default();
85        s.home_dc_id = dc_id;
86        self.save(&s)
87    }
88
89    /// Apply a single update-sequence change without a full save/load.
90    ///
91    /// Ported from  `Session::set_update_state(UpdateState)`.
92    ///
93    /// `update` is the new partial or full state to merge in.
94    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
95        let mut s = self.load()?.unwrap_or_default();
96        update.apply_to(&mut s.updates_state);
97        self.save(&s)
98    }
99
100    /// Cache a peer access hash without a full session save.
101    ///
102    /// This is lossy-on-default (full round-trip) but correct.
103    /// Override in SQL backends to issue a single `INSERT OR REPLACE`.
104    ///
105    /// Ported from  `Session::cache_peer`.
106    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
107        let mut s = self.load()?.unwrap_or_default();
108        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
109            *existing = peer.clone();
110        } else {
111            s.peers.push(peer.clone());
112        }
113        self.save(&s)
114    }
115}
116
117// UpdateStateChange (mirrors  UpdateState enum)
118
119/// A single update-sequence change, applied via [`SessionBackend::apply_update_state`].
120///
121///uses:
122/// ```text
123/// UpdateState::All(updates_state)
124/// UpdateState::Primary { pts, date, seq }
125/// UpdateState::Secondary { qts }
126/// UpdateState::Channel { id, pts }
127/// ```
128///
129/// We map this 1-to-1 to layer's `UpdatesStateSnap`.
130#[derive(Debug, Clone)]
131pub enum UpdateStateChange {
132    /// Replace the entire state snapshot.
133    All(UpdatesStateSnap),
134    /// Update main sequence counters only (non-channel).
135    Primary { pts: i32, date: i32, seq: i32 },
136    /// Update the QTS counter (secret chats).
137    Secondary { qts: i32 },
138    /// Update the PTS for a specific channel.
139    Channel { id: i64, pts: i32 },
140}
141
142impl UpdateStateChange {
143    /// Apply `self` to `snap` in-place.
144    pub fn apply_to(&self, snap: &mut UpdatesStateSnap) {
145        match self {
146            Self::All(new_snap) => *snap = new_snap.clone(),
147            Self::Primary { pts, date, seq } => {
148                snap.pts = *pts;
149                snap.date = *date;
150                snap.seq = *seq;
151            }
152            Self::Secondary { qts } => {
153                snap.qts = *qts;
154            }
155            Self::Channel { id, pts } => {
156                // Replace or insert per-channel pts
157                if let Some(existing) = snap.channels.iter_mut().find(|c| c.0 == *id) {
158                    existing.1 = *pts;
159                } else {
160                    snap.channels.push((*id, *pts));
161                }
162            }
163        }
164    }
165}
166
167// BinaryFileBackend
168
169/// Stores the session in a compact binary file (v2 format).
170pub struct BinaryFileBackend {
171    path: PathBuf,
172}
173
174impl BinaryFileBackend {
175    pub fn new(path: impl Into<PathBuf>) -> Self {
176        Self { path: path.into() }
177    }
178
179    pub fn path(&self) -> &std::path::Path {
180        &self.path
181    }
182}
183
184impl SessionBackend for BinaryFileBackend {
185    fn save(&self, session: &PersistedSession) -> io::Result<()> {
186        session.save(&self.path)
187    }
188
189    fn load(&self) -> io::Result<Option<PersistedSession>> {
190        if !self.path.exists() {
191            return Ok(None);
192        }
193        match PersistedSession::load(&self.path) {
194            Ok(s) => Ok(Some(s)),
195            Err(e) => {
196                let bak = self.path.with_extension("bak");
197                tracing::warn!(
198                    "[layer] Session file {:?} is corrupt ({e}); \
199                     renaming to {:?} and starting fresh",
200                    self.path,
201                    bak
202                );
203                let _ = std::fs::rename(&self.path, &bak);
204                Ok(None)
205            }
206        }
207    }
208
209    fn delete(&self) -> io::Result<()> {
210        if self.path.exists() {
211            std::fs::remove_file(&self.path)?;
212        }
213        Ok(())
214    }
215
216    fn name(&self) -> &str {
217        "binary-file"
218    }
219
220    // BinaryFileBackend: the default granular impls (load→mutate→save) are
221    // fine since the format is a single compact binary blob. No override needed.
222}
223
224// InMemoryBackend
225
226/// Ephemeral in-process session: nothing persisted to disk.
227///
228/// Override the granular methods to skip the clone overhead of the full
229/// snapshot path (we're already in memory, so direct field mutations are
230/// cheaper than clone→mutate→replace).
231#[derive(Default)]
232pub struct InMemoryBackend {
233    data: std::sync::Mutex<Option<PersistedSession>>,
234}
235
236impl InMemoryBackend {
237    pub fn new() -> Self {
238        Self::default()
239    }
240
241    /// Test helper: get a snapshot of the current in-memory state.
242    pub fn snapshot(&self) -> Option<PersistedSession> {
243        self.data.lock().unwrap().clone()
244    }
245}
246
247impl SessionBackend for InMemoryBackend {
248    fn save(&self, s: &PersistedSession) -> io::Result<()> {
249        *self.data.lock().unwrap() = Some(s.clone());
250        Ok(())
251    }
252
253    fn load(&self) -> io::Result<Option<PersistedSession>> {
254        Ok(self.data.lock().unwrap().clone())
255    }
256
257    fn delete(&self) -> io::Result<()> {
258        *self.data.lock().unwrap() = None;
259        Ok(())
260    }
261
262    fn name(&self) -> &str {
263        "in-memory"
264    }
265
266    // Granular overrides: cheaper than load→clone→save
267
268    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
269        let mut guard = self.data.lock().unwrap();
270        let s = guard.get_or_insert_with(PersistedSession::default);
271        if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
272            *existing = entry.clone();
273        } else {
274            s.dcs.push(entry.clone());
275        }
276        Ok(())
277    }
278
279    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
280        let mut guard = self.data.lock().unwrap();
281        let s = guard.get_or_insert_with(PersistedSession::default);
282        s.home_dc_id = dc_id;
283        Ok(())
284    }
285
286    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
287        let mut guard = self.data.lock().unwrap();
288        let s = guard.get_or_insert_with(PersistedSession::default);
289        update.apply_to(&mut s.updates_state);
290        Ok(())
291    }
292
293    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
294        let mut guard = self.data.lock().unwrap();
295        let s = guard.get_or_insert_with(PersistedSession::default);
296        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
297            *existing = peer.clone();
298        } else {
299            s.peers.push(peer.clone());
300        }
301        Ok(())
302    }
303}
304
305// StringSessionBackend
306
307/// Portable base64 string session backend.
308pub struct StringSessionBackend {
309    data: std::sync::Mutex<String>,
310}
311
312impl StringSessionBackend {
313    pub fn new(s: impl Into<String>) -> Self {
314        Self {
315            data: std::sync::Mutex::new(s.into()),
316        }
317    }
318
319    pub fn current(&self) -> String {
320        self.data.lock().unwrap().clone()
321    }
322}
323
324impl SessionBackend for StringSessionBackend {
325    fn save(&self, session: &PersistedSession) -> io::Result<()> {
326        *self.data.lock().unwrap() = session.to_string();
327        Ok(())
328    }
329
330    fn load(&self) -> io::Result<Option<PersistedSession>> {
331        let s = self.data.lock().unwrap().clone();
332        if s.trim().is_empty() {
333            return Ok(None);
334        }
335        PersistedSession::from_string(&s).map(Some)
336    }
337
338    fn delete(&self) -> io::Result<()> {
339        *self.data.lock().unwrap() = String::new();
340        Ok(())
341    }
342
343    fn name(&self) -> &str {
344        "string-session"
345    }
346}
347
348// Tests
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    fn make_dc(id: i32) -> DcEntry {
355        DcEntry {
356            dc_id: id,
357            addr: format!("1.2.3.{id}:443"),
358            auth_key: None,
359            first_salt: 0,
360            time_offset: 0,
361            flags: DcFlags::NONE,
362        }
363    }
364
365    fn make_peer(id: i64, hash: i64) -> CachedPeer {
366        CachedPeer {
367            id,
368            access_hash: hash,
369            is_channel: false,
370        }
371    }
372
373    // InMemoryBackend: basic save/load
374
375    #[test]
376    fn inmemory_load_returns_none_when_empty() {
377        let b = InMemoryBackend::new();
378        assert!(b.load().unwrap().is_none());
379    }
380
381    #[test]
382    fn inmemory_save_then_load_round_trips() {
383        let b = InMemoryBackend::new();
384        let mut s = PersistedSession::default();
385        s.home_dc_id = 3;
386        s.dcs.push(make_dc(3));
387        b.save(&s).unwrap();
388
389        let loaded = b.load().unwrap().unwrap();
390        assert_eq!(loaded.home_dc_id, 3);
391        assert_eq!(loaded.dcs.len(), 1);
392    }
393
394    #[test]
395    fn inmemory_delete_clears_state() {
396        let b = InMemoryBackend::new();
397        let mut s = PersistedSession::default();
398        s.home_dc_id = 2;
399        b.save(&s).unwrap();
400        b.delete().unwrap();
401        assert!(b.load().unwrap().is_none());
402    }
403
404    // InMemoryBackend: granular methods
405
406    #[test]
407    fn inmemory_update_dc_inserts_new() {
408        let b = InMemoryBackend::new();
409        b.update_dc(&make_dc(4)).unwrap();
410        let s = b.snapshot().unwrap();
411        assert_eq!(s.dcs.len(), 1);
412        assert_eq!(s.dcs[0].dc_id, 4);
413    }
414
415    #[test]
416    fn inmemory_update_dc_replaces_existing() {
417        let b = InMemoryBackend::new();
418        b.update_dc(&make_dc(2)).unwrap();
419        let mut updated = make_dc(2);
420        updated.addr = "9.9.9.9:443".to_string();
421        b.update_dc(&updated).unwrap();
422
423        let s = b.snapshot().unwrap();
424        assert_eq!(s.dcs.len(), 1);
425        assert_eq!(s.dcs[0].addr, "9.9.9.9:443");
426    }
427
428    #[test]
429    fn inmemory_set_home_dc() {
430        let b = InMemoryBackend::new();
431        b.set_home_dc(5).unwrap();
432        assert_eq!(b.snapshot().unwrap().home_dc_id, 5);
433    }
434
435    #[test]
436    fn inmemory_cache_peer_inserts() {
437        let b = InMemoryBackend::new();
438        b.cache_peer(&make_peer(100, 0xdeadbeef)).unwrap();
439        let s = b.snapshot().unwrap();
440        assert_eq!(s.peers.len(), 1);
441        assert_eq!(s.peers[0].id, 100);
442    }
443
444    #[test]
445    fn inmemory_cache_peer_updates_existing() {
446        let b = InMemoryBackend::new();
447        b.cache_peer(&make_peer(100, 111)).unwrap();
448        b.cache_peer(&make_peer(100, 222)).unwrap();
449        let s = b.snapshot().unwrap();
450        assert_eq!(s.peers.len(), 1);
451        assert_eq!(s.peers[0].access_hash, 222);
452    }
453
454    // UpdateStateChange
455
456    #[test]
457    fn update_state_primary() {
458        let mut snap = UpdatesStateSnap {
459            pts: 0,
460            qts: 0,
461            date: 0,
462            seq: 0,
463            channels: vec![],
464        };
465        UpdateStateChange::Primary {
466            pts: 10,
467            date: 20,
468            seq: 30,
469        }
470        .apply_to(&mut snap);
471        assert_eq!(snap.pts, 10);
472        assert_eq!(snap.date, 20);
473        assert_eq!(snap.seq, 30);
474        assert_eq!(snap.qts, 0); // untouched
475    }
476
477    #[test]
478    fn update_state_secondary() {
479        let mut snap = UpdatesStateSnap {
480            pts: 5,
481            qts: 0,
482            date: 0,
483            seq: 0,
484            channels: vec![],
485        };
486        UpdateStateChange::Secondary { qts: 99 }.apply_to(&mut snap);
487        assert_eq!(snap.qts, 99);
488        assert_eq!(snap.pts, 5); // untouched
489    }
490
491    #[test]
492    fn update_state_channel_inserts() {
493        let mut snap = UpdatesStateSnap {
494            pts: 0,
495            qts: 0,
496            date: 0,
497            seq: 0,
498            channels: vec![],
499        };
500        UpdateStateChange::Channel { id: 12345, pts: 42 }.apply_to(&mut snap);
501        assert_eq!(snap.channels, vec![(12345, 42)]);
502    }
503
504    #[test]
505    fn update_state_channel_updates_existing() {
506        let mut snap = UpdatesStateSnap {
507            pts: 0,
508            qts: 0,
509            date: 0,
510            seq: 0,
511            channels: vec![(12345, 10), (67890, 5)],
512        };
513        UpdateStateChange::Channel { id: 12345, pts: 99 }.apply_to(&mut snap);
514        // First channel updated, second untouched
515        assert_eq!(snap.channels[0], (12345, 99));
516        assert_eq!(snap.channels[1], (67890, 5));
517    }
518
519    #[test]
520    fn apply_update_state_via_backend() {
521        let b = InMemoryBackend::new();
522        b.apply_update_state(UpdateStateChange::Primary {
523            pts: 7,
524            date: 8,
525            seq: 9,
526        })
527        .unwrap();
528        let s = b.snapshot().unwrap();
529        assert_eq!(s.updates_state.pts, 7);
530    }
531
532    // Default impl (BinaryFileBackend trait shape via InMemory smoke)
533
534    #[test]
535    fn default_update_dc_via_trait_object() {
536        let b: Box<dyn SessionBackend> = Box::new(InMemoryBackend::new());
537        b.update_dc(&make_dc(1)).unwrap();
538        b.update_dc(&make_dc(2)).unwrap();
539        // Can't call snapshot() on trait object, but save/load must be consistent
540        let loaded = b.load().unwrap().unwrap();
541        assert_eq!(loaded.dcs.len(), 2);
542    }
543
544    // IPv6 tests
545
546    fn make_dc_v6(id: i32) -> DcEntry {
547        DcEntry {
548            dc_id: id,
549            addr: format!("[2001:b28:f23d:f00{}::a]:443", id),
550            auth_key: None,
551            first_salt: 0,
552            time_offset: 0,
553            flags: DcFlags::IPV6,
554        }
555    }
556
557    #[test]
558    fn dc_entry_from_parts_ipv4() {
559        let dc = DcEntry::from_parts(1, "149.154.175.53", 443, DcFlags::NONE);
560        assert_eq!(dc.addr, "149.154.175.53:443");
561        assert!(!dc.is_ipv6());
562        let sa = dc.socket_addr().unwrap();
563        assert_eq!(sa.port(), 443);
564    }
565
566    #[test]
567    fn dc_entry_from_parts_ipv6() {
568        let dc = DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6);
569        assert_eq!(dc.addr, "[2001:b28:f23d:f001::a]:443");
570        assert!(dc.is_ipv6());
571        let sa = dc.socket_addr().unwrap();
572        assert_eq!(sa.port(), 443);
573    }
574
575    #[test]
576    fn persisted_session_dc_for_prefers_ipv6() {
577        let mut s = PersistedSession::default();
578        s.dcs.push(make_dc(2)); // IPv4
579        s.dcs.push(make_dc_v6(2)); // IPv6
580
581        let v6 = s.dc_for(2, true).unwrap();
582        assert!(v6.is_ipv6());
583
584        let v4 = s.dc_for(2, false).unwrap();
585        assert!(!v4.is_ipv6());
586    }
587
588    #[test]
589    fn persisted_session_dc_for_falls_back_when_only_ipv4() {
590        let mut s = PersistedSession::default();
591        s.dcs.push(make_dc(3)); // IPv4 only
592
593        // Asking for IPv6 should fall back to IPv4
594        let dc = s.dc_for(3, true).unwrap();
595        assert!(!dc.is_ipv6());
596    }
597
598    #[test]
599    fn persisted_session_all_dcs_for_returns_both() {
600        let mut s = PersistedSession::default();
601        s.dcs.push(make_dc(1));
602        s.dcs.push(make_dc_v6(1));
603        s.dcs.push(make_dc(2));
604
605        assert_eq!(s.all_dcs_for(1).count(), 2);
606        assert_eq!(s.all_dcs_for(2).count(), 1);
607        assert_eq!(s.all_dcs_for(5).count(), 0);
608    }
609
610    #[test]
611    fn inmemory_ipv4_and_ipv6_coexist() {
612        let b = InMemoryBackend::new();
613        b.update_dc(&make_dc(2)).unwrap(); // IPv4
614        b.update_dc(&make_dc_v6(2)).unwrap(); // IPv6
615
616        let s = b.snapshot().unwrap();
617        // Both entries must survive they have different flags
618        assert_eq!(s.dcs.iter().filter(|d| d.dc_id == 2).count(), 2);
619    }
620
621    #[test]
622    fn binary_roundtrip_ipv4_and_ipv6() {
623        let mut s = PersistedSession::default();
624        s.home_dc_id = 2;
625        s.dcs.push(make_dc(2));
626        s.dcs.push(make_dc_v6(2));
627
628        let bytes = s.to_bytes();
629        let loaded = PersistedSession::from_bytes(&bytes).unwrap();
630        assert_eq!(loaded.dcs.len(), 2);
631        assert_eq!(loaded.dcs.iter().filter(|d| d.is_ipv6()).count(), 1);
632        assert_eq!(loaded.dcs.iter().filter(|d| !d.is_ipv6()).count(), 1);
633    }
634}
635
636// ─── SqliteBackend ────────────────────────────────────────────────────────────
637
638/// SQLite-backed session (via `rusqlite`).
639///
640/// Enabled with the `sqlite-session` Cargo feature.
641///
642/// # Schema
643///
644/// Five tables are created on first open (idempotent):
645///
646/// | Table          | Purpose                                          |
647/// |----------------|--------------------------------------------------|
648/// | `meta`         | `home_dc_id` and future scalar values            |
649/// | `dcs`          | One row per DC (auth key, address, flags, …)     |
650/// | `update_state` | Single-row pts / qts / date / seq                |
651/// | `channel_pts`  | Per-channel pts                                  |
652/// | `peers`        | Access-hash cache                                |
653///
654/// # Granular writes
655///
656/// All [`SessionBackend`] extension methods (`update_dc`, `set_home_dc`,
657/// `apply_update_state`, `cache_peer`) issue **single-row SQL statements**
658/// instead of the default load-mutate-save round-trip, so they are safe to
659/// call frequently (e.g. on every update batch) without performance concerns.
660#[cfg(feature = "sqlite-session")]
661pub struct SqliteBackend {
662    conn: std::sync::Mutex<rusqlite::Connection>,
663    label: String,
664}
665
666#[cfg(feature = "sqlite-session")]
667impl SqliteBackend {
668    const SCHEMA: &'static str = "
669        PRAGMA journal_mode = WAL;
670        PRAGMA synchronous  = NORMAL;
671
672        CREATE TABLE IF NOT EXISTS meta (
673            key   TEXT    PRIMARY KEY,
674            value INTEGER NOT NULL DEFAULT 0
675        );
676
677        CREATE TABLE IF NOT EXISTS dcs (
678            dc_id       INTEGER NOT NULL,
679            flags       INTEGER NOT NULL DEFAULT 0,
680            addr        TEXT    NOT NULL,
681            auth_key    BLOB,
682            first_salt  INTEGER NOT NULL DEFAULT 0,
683            time_offset INTEGER NOT NULL DEFAULT 0,
684            PRIMARY KEY (dc_id, flags)
685        );
686
687        CREATE TABLE IF NOT EXISTS update_state (
688            id   INTEGER PRIMARY KEY CHECK (id = 1),
689            pts  INTEGER NOT NULL DEFAULT 0,
690            qts  INTEGER NOT NULL DEFAULT 0,
691            date INTEGER NOT NULL DEFAULT 0,
692            seq  INTEGER NOT NULL DEFAULT 0
693        );
694
695        CREATE TABLE IF NOT EXISTS channel_pts (
696            channel_id INTEGER PRIMARY KEY,
697            pts        INTEGER NOT NULL
698        );
699
700        CREATE TABLE IF NOT EXISTS peers (
701            id           INTEGER PRIMARY KEY,
702            access_hash  INTEGER NOT NULL,
703            is_channel   INTEGER NOT NULL DEFAULT 0
704        );
705    ";
706
707    /// Open (or create) the SQLite database at `path`.
708    pub fn open(path: impl Into<PathBuf>) -> io::Result<Self> {
709        let path = path.into();
710        let label = path.display().to_string();
711        let conn = rusqlite::Connection::open(&path)
712            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
713        conn.execute_batch(Self::SCHEMA)
714            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
715        Ok(Self {
716            conn: std::sync::Mutex::new(conn),
717            label,
718        })
719    }
720
721    /// Open an in-process SQLite database (useful for tests).
722    pub fn in_memory() -> io::Result<Self> {
723        let conn = rusqlite::Connection::open_in_memory()
724            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
725        conn.execute_batch(Self::SCHEMA)
726            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
727        Ok(Self {
728            conn: std::sync::Mutex::new(conn),
729            label: ":memory:".into(),
730        })
731    }
732
733    fn map_err(e: rusqlite::Error) -> io::Error {
734        io::Error::new(io::ErrorKind::Other, e)
735    }
736
737    /// Read the full session out of the database.
738    fn read_session(conn: &rusqlite::Connection) -> io::Result<PersistedSession> {
739        // home_dc_id
740        let home_dc_id: i32 = conn
741            .query_row("SELECT value FROM meta WHERE key = 'home_dc_id'", [], |r| {
742                r.get(0)
743            })
744            .unwrap_or(0);
745
746        // dcs
747        let mut stmt = conn
748            .prepare("SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs")
749            .map_err(Self::map_err)?;
750        let dcs = stmt
751            .query_map([], |row| {
752                let dc_id: i32 = row.get(0)?;
753                let flags_raw: u8 = row.get(1)?;
754                let addr: String = row.get(2)?;
755                let key_blob: Option<Vec<u8>> = row.get(3)?;
756                let first_salt: i64 = row.get(4)?;
757                let time_offset: i32 = row.get(5)?;
758                Ok((dc_id, addr, key_blob, first_salt, time_offset, flags_raw))
759            })
760            .map_err(Self::map_err)?
761            .filter_map(|r| r.ok())
762            .map(
763                |(dc_id, addr, key_blob, first_salt, time_offset, flags_raw)| {
764                    let auth_key = key_blob.and_then(|b| {
765                        if b.len() == 256 {
766                            let mut k = [0u8; 256];
767                            k.copy_from_slice(&b);
768                            Some(k)
769                        } else {
770                            None
771                        }
772                    });
773                    DcEntry {
774                        dc_id,
775                        addr,
776                        auth_key,
777                        first_salt,
778                        time_offset,
779                        flags: DcFlags(flags_raw),
780                    }
781                },
782            )
783            .collect();
784
785        // update_state
786        let updates_state = conn
787            .query_row(
788                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
789                [],
790                |r| {
791                    Ok(UpdatesStateSnap {
792                        pts: r.get(0)?,
793                        qts: r.get(1)?,
794                        date: r.get(2)?,
795                        seq: r.get(3)?,
796                        channels: vec![],
797                    })
798                },
799            )
800            .unwrap_or_default();
801
802        // channel_pts
803        let mut ch_stmt = conn
804            .prepare("SELECT channel_id, pts FROM channel_pts")
805            .map_err(Self::map_err)?;
806        let channels: Vec<(i64, i32)> = ch_stmt
807            .query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i32>(1)?)))
808            .map_err(Self::map_err)?
809            .filter_map(|r| r.ok())
810            .collect();
811
812        // peers
813        let mut peer_stmt = conn
814            .prepare("SELECT id, access_hash, is_channel FROM peers")
815            .map_err(Self::map_err)?;
816        let peers: Vec<CachedPeer> = peer_stmt
817            .query_map([], |r| {
818                Ok(CachedPeer {
819                    id: r.get(0)?,
820                    access_hash: r.get(1)?,
821                    is_channel: r.get::<_, i32>(2)? != 0,
822                })
823            })
824            .map_err(Self::map_err)?
825            .filter_map(|r| r.ok())
826            .collect();
827
828        Ok(PersistedSession {
829            home_dc_id,
830            dcs,
831            updates_state: UpdatesStateSnap {
832                channels,
833                ..updates_state
834            },
835            peers,
836        })
837    }
838
839    /// Write the full session into the database inside a single transaction.
840    fn write_session(conn: &rusqlite::Connection, s: &PersistedSession) -> io::Result<()> {
841        conn.execute_batch("BEGIN IMMEDIATE")
842            .map_err(Self::map_err)?;
843
844        conn.execute(
845            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
846             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
847            rusqlite::params![s.home_dc_id],
848        )
849        .map_err(Self::map_err)?;
850
851        // Replace all DCs
852        conn.execute("DELETE FROM dcs", []).map_err(Self::map_err)?;
853        for d in &s.dcs {
854            conn.execute(
855                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
856                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
857                rusqlite::params![
858                    d.dc_id,
859                    d.flags.0,
860                    d.addr,
861                    d.auth_key.as_ref().map(|k| k.as_ref()),
862                    d.first_salt,
863                    d.time_offset,
864                ],
865            )
866            .map_err(Self::map_err)?;
867        }
868
869        // update_state
870        let us = &s.updates_state;
871        conn.execute(
872            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1, ?1, ?2, ?3, ?4)
873             ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, qts=excluded.qts,
874             date=excluded.date, seq=excluded.seq",
875            rusqlite::params![us.pts, us.qts, us.date, us.seq],
876        )
877        .map_err(Self::map_err)?;
878
879        conn.execute("DELETE FROM channel_pts", [])
880            .map_err(Self::map_err)?;
881        for &(cid, cpts) in &us.channels {
882            conn.execute(
883                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
884                rusqlite::params![cid, cpts],
885            )
886            .map_err(Self::map_err)?;
887        }
888
889        // peers
890        conn.execute("DELETE FROM peers", [])
891            .map_err(Self::map_err)?;
892        for p in &s.peers {
893            conn.execute(
894                "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)",
895                rusqlite::params![p.id, p.access_hash, p.is_channel as i32],
896            )
897            .map_err(Self::map_err)?;
898        }
899
900        conn.execute_batch("COMMIT").map_err(Self::map_err)
901    }
902}
903
904#[cfg(feature = "sqlite-session")]
905impl SessionBackend for SqliteBackend {
906    fn save(&self, session: &PersistedSession) -> io::Result<()> {
907        let conn = self.conn.lock().unwrap();
908        Self::write_session(&conn, session)
909    }
910
911    fn load(&self) -> io::Result<Option<PersistedSession>> {
912        let conn = self.conn.lock().unwrap();
913        // If meta table is empty, no session has been saved yet.
914        let count: i64 = conn
915            .query_row("SELECT COUNT(*) FROM meta", [], |r| r.get(0))
916            .map_err(Self::map_err)?;
917        if count == 0 {
918            return Ok(None);
919        }
920        Self::read_session(&conn).map(Some)
921    }
922
923    fn delete(&self) -> io::Result<()> {
924        let conn = self.conn.lock().unwrap();
925        conn.execute_batch(
926            "BEGIN IMMEDIATE;
927             DELETE FROM meta;
928             DELETE FROM dcs;
929             DELETE FROM update_state;
930             DELETE FROM channel_pts;
931             DELETE FROM peers;
932             COMMIT;",
933        )
934        .map_err(Self::map_err)
935    }
936
937    fn name(&self) -> &str {
938        &self.label
939    }
940
941    // ── Granular overrides (single-row SQL, no full round-trip) ──────────────
942
943    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
944        let conn = self.conn.lock().unwrap();
945        conn.execute(
946            "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
947             VALUES (?1, ?6, ?2, ?3, ?4, ?5)
948             ON CONFLICT(dc_id, flags) DO UPDATE SET
949               addr        = excluded.addr,
950               auth_key    = excluded.auth_key,
951               first_salt  = excluded.first_salt,
952               time_offset = excluded.time_offset",
953            rusqlite::params![
954                entry.dc_id,
955                entry.addr,
956                entry.auth_key.as_ref().map(|k| k.as_ref()),
957                entry.first_salt,
958                entry.time_offset,
959                entry.flags.0,
960            ],
961        )
962        .map(|_| ())
963        .map_err(Self::map_err)
964    }
965
966    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
967        let conn = self.conn.lock().unwrap();
968        conn.execute(
969            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
970             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
971            rusqlite::params![dc_id],
972        )
973        .map(|_| ())
974        .map_err(Self::map_err)
975    }
976
977    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
978        let conn = self.conn.lock().unwrap();
979        match update {
980            UpdateStateChange::All(snap) => {
981                conn.execute(
982                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
983                     ON CONFLICT(id) DO UPDATE SET
984                       pts=excluded.pts, qts=excluded.qts,
985                       date=excluded.date, seq=excluded.seq",
986                    rusqlite::params![snap.pts, snap.qts, snap.date, snap.seq],
987                )
988                .map_err(Self::map_err)?;
989                conn.execute("DELETE FROM channel_pts", [])
990                    .map_err(Self::map_err)?;
991                for &(cid, cpts) in &snap.channels {
992                    conn.execute(
993                        "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
994                        rusqlite::params![cid, cpts],
995                    )
996                    .map_err(Self::map_err)?;
997                }
998                Ok(())
999            }
1000            UpdateStateChange::Primary { pts, date, seq } => conn
1001                .execute(
1002                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,0,?2,?3)
1003                     ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, date=excluded.date,
1004                     seq=excluded.seq",
1005                    rusqlite::params![pts, date, seq],
1006                )
1007                .map(|_| ())
1008                .map_err(Self::map_err),
1009            UpdateStateChange::Secondary { qts } => conn
1010                .execute(
1011                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,0,?1,0,0)
1012                     ON CONFLICT(id) DO UPDATE SET qts = excluded.qts",
1013                    rusqlite::params![qts],
1014                )
1015                .map(|_| ())
1016                .map_err(Self::map_err),
1017            UpdateStateChange::Channel { id, pts } => conn
1018                .execute(
1019                    "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)
1020                     ON CONFLICT(channel_id) DO UPDATE SET pts = excluded.pts",
1021                    rusqlite::params![id, pts],
1022                )
1023                .map(|_| ())
1024                .map_err(Self::map_err),
1025        }
1026    }
1027
1028    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1029        let conn = self.conn.lock().unwrap();
1030        conn.execute(
1031            "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)
1032             ON CONFLICT(id) DO UPDATE SET
1033               access_hash = excluded.access_hash,
1034               is_channel  = excluded.is_channel",
1035            rusqlite::params![peer.id, peer.access_hash, peer.is_channel as i32],
1036        )
1037        .map(|_| ())
1038        .map_err(Self::map_err)
1039    }
1040}
1041
1042// ─── LibSqlBackend ────────────────────────────────────────────────────────────
1043
1044/// libSQL-backed session (Turso / embedded replica / in-process).
1045///
1046/// Enabled with the `libsql-session` Cargo feature.
1047///
1048/// The libSQL API is async; since [`SessionBackend`] methods are sync we
1049/// block via `tokio::runtime::Handle::current().block_on(…)`.  Always
1050/// call from inside a Tokio runtime (i.e. the same runtime as the rest of
1051/// `layer-client`).
1052///
1053/// # Connecting
1054///
1055/// | Mode              | Constructor                        |
1056/// |-------------------|------------------------------------|
1057/// | Local file        | `LibSqlBackend::open_local(path)`  |
1058/// | In-memory         | `LibSqlBackend::in_memory()`       |
1059/// | Turso remote      | `LibSqlBackend::open_remote(url, token)` |
1060/// | Embedded replica  | `LibSqlBackend::open_replica(path, url, token)` |
1061#[cfg(feature = "libsql-session")]
1062pub struct LibSqlBackend {
1063    conn: libsql::Connection,
1064    label: String,
1065}
1066
1067#[cfg(feature = "libsql-session")]
1068impl LibSqlBackend {
1069    const SCHEMA: &'static str = "
1070        CREATE TABLE IF NOT EXISTS meta (
1071            key   TEXT    PRIMARY KEY,
1072            value INTEGER NOT NULL DEFAULT 0
1073        );
1074        CREATE TABLE IF NOT EXISTS dcs (
1075            dc_id       INTEGER NOT NULL,
1076            flags       INTEGER NOT NULL DEFAULT 0,
1077            addr        TEXT    NOT NULL,
1078            auth_key    BLOB,
1079            first_salt  INTEGER NOT NULL DEFAULT 0,
1080            time_offset INTEGER NOT NULL DEFAULT 0,
1081            PRIMARY KEY (dc_id, flags)
1082        );
1083        CREATE TABLE IF NOT EXISTS update_state (
1084            id   INTEGER PRIMARY KEY CHECK (id = 1),
1085            pts  INTEGER NOT NULL DEFAULT 0,
1086            qts  INTEGER NOT NULL DEFAULT 0,
1087            date INTEGER NOT NULL DEFAULT 0,
1088            seq  INTEGER NOT NULL DEFAULT 0
1089        );
1090        CREATE TABLE IF NOT EXISTS channel_pts (
1091            channel_id INTEGER PRIMARY KEY,
1092            pts        INTEGER NOT NULL
1093        );
1094        CREATE TABLE IF NOT EXISTS peers (
1095            id          INTEGER PRIMARY KEY,
1096            access_hash INTEGER NOT NULL,
1097            is_channel  INTEGER NOT NULL DEFAULT 0
1098        );
1099    ";
1100
1101    fn block<F, T>(fut: F) -> io::Result<T>
1102    where
1103        F: std::future::Future<Output = Result<T, libsql::Error>>,
1104    {
1105        tokio::runtime::Handle::current()
1106            .block_on(fut)
1107            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
1108    }
1109
1110    async fn apply_schema(conn: &libsql::Connection) -> Result<(), libsql::Error> {
1111        conn.execute_batch(Self::SCHEMA).await
1112    }
1113
1114    /// Open a local file database.
1115    pub fn open_local(path: impl Into<PathBuf>) -> io::Result<Self> {
1116        let path = path.into();
1117        let label = path.display().to_string();
1118        let db = Self::block(async { libsql::Builder::new_local(path).build().await })?;
1119        let conn = Self::block(async { db.connect() })
1120            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1121        Self::block(Self::apply_schema(&conn))?;
1122        Ok(Self {
1123            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1124            label,
1125        })
1126    }
1127
1128    /// Open an in-process in-memory database (useful for tests).
1129    pub fn in_memory() -> io::Result<Self> {
1130        let db = Self::block(async { libsql::Builder::new_local(":memory:").build().await })?;
1131        let conn = Self::block(async { db.connect() })
1132            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1133        Self::block(Self::apply_schema(&conn))?;
1134        Ok(Self {
1135            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1136            label: ":memory:".into(),
1137        })
1138    }
1139
1140    /// Connect to a remote Turso database.
1141    pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> io::Result<Self> {
1142        let url = url.into();
1143        let label = url.clone();
1144        let db = Self::block(async {
1145            libsql::Builder::new_remote(url, auth_token.into())
1146                .build()
1147                .await
1148        })?;
1149        let conn = Self::block(async { db.connect() })
1150            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1151        Self::block(Self::apply_schema(&conn))?;
1152        Ok(Self {
1153            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1154            label,
1155        })
1156    }
1157
1158    /// Open an embedded replica (local file + Turso remote sync).
1159    pub fn open_replica(
1160        path: impl Into<PathBuf>,
1161        url: impl Into<String>,
1162        auth_token: impl Into<String>,
1163    ) -> io::Result<Self> {
1164        let path = path.into();
1165        let label = format!("{} (replica of {})", path.display(), url.into());
1166        let db = Self::block(async {
1167            libsql::Builder::new_remote_replica(path, url.into(), auth_token.into())
1168                .build()
1169                .await
1170        })?;
1171        let conn = Self::block(async { db.connect() })
1172            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1173        Self::block(Self::apply_schema(&conn))?;
1174        Ok(Self {
1175            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1176            label,
1177        })
1178    }
1179
1180    async fn read_session_async(
1181        conn: &libsql::Connection,
1182    ) -> Result<PersistedSession, libsql::Error> {
1183        use libsql::de;
1184
1185        // home_dc_id
1186        let home_dc_id: i32 = conn
1187            .query("SELECT value FROM meta WHERE key = 'home_dc_id'", ())
1188            .await?
1189            .next()
1190            .await?
1191            .map(|r| r.get::<i32>(0))
1192            .transpose()?
1193            .unwrap_or(0);
1194
1195        // dcs
1196        let mut rows = conn
1197            .query(
1198                "SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs",
1199                (),
1200            )
1201            .await?;
1202        let mut dcs = Vec::new();
1203        while let Some(row) = rows.next().await? {
1204            let dc_id: i32 = row.get(0)?;
1205            let flags_raw: u8 = row.get::<i64>(1)? as u8;
1206            let addr: String = row.get(2)?;
1207            let key_blob: Option<Vec<u8>> = row.get(3)?;
1208            let first_salt: i64 = row.get(4)?;
1209            let time_offset: i32 = row.get(5)?;
1210            let auth_key = match key_blob {
1211                Some(b) if b.len() == 256 => {
1212                    let mut k = [0u8; 256];
1213                    k.copy_from_slice(&b);
1214                    Some(k)
1215                }
1216                Some(b) => {
1217                    return Err(libsql::Error::Misuse(format!(
1218                        "auth_key blob must be 256 bytes, got {}",
1219                        b.len()
1220                    )));
1221                }
1222                None => None,
1223            };
1224            dcs.push(DcEntry {
1225                dc_id,
1226                addr,
1227                auth_key,
1228                first_salt,
1229                time_offset,
1230                flags: DcFlags(flags_raw),
1231            });
1232        }
1233
1234        // update_state
1235        let mut us_row = conn
1236            .query(
1237                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1238                (),
1239            )
1240            .await?;
1241        let updates_state = if let Some(r) = us_row.next().await? {
1242            UpdatesStateSnap {
1243                pts: r.get(0)?,
1244                qts: r.get(1)?,
1245                date: r.get(2)?,
1246                seq: r.get(3)?,
1247                channels: vec![],
1248            }
1249        } else {
1250            UpdatesStateSnap::default()
1251        };
1252
1253        // channel_pts
1254        let mut ch_rows = conn
1255            .query("SELECT channel_id, pts FROM channel_pts", ())
1256            .await?;
1257        let mut channels = Vec::new();
1258        while let Some(r) = ch_rows.next().await? {
1259            channels.push((r.get::<i64>(0)?, r.get::<i32>(1)?));
1260        }
1261
1262        // peers
1263        let mut peer_rows = conn
1264            .query("SELECT id, access_hash, is_channel FROM peers", ())
1265            .await?;
1266        let mut peers = Vec::new();
1267        while let Some(r) = peer_rows.next().await? {
1268            peers.push(CachedPeer {
1269                id: r.get(0)?,
1270                access_hash: r.get(1)?,
1271                is_channel: r.get::<i32>(2)? != 0,
1272            });
1273        }
1274
1275        Ok(PersistedSession {
1276            home_dc_id,
1277            dcs,
1278            updates_state: UpdatesStateSnap {
1279                channels,
1280                ..updates_state
1281            },
1282            peers,
1283        })
1284    }
1285
1286    async fn write_session_async(
1287        conn: &libsql::Connection,
1288        s: &PersistedSession,
1289    ) -> Result<(), libsql::Error> {
1290        conn.execute_batch("BEGIN IMMEDIATE").await?;
1291
1292        conn.execute(
1293            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1294             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1295            libsql::params![s.home_dc_id],
1296        )
1297        .await?;
1298
1299        conn.execute("DELETE FROM dcs", ()).await?;
1300        for d in &s.dcs {
1301            conn.execute(
1302                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1303                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1304                libsql::params![
1305                    d.dc_id,
1306                    d.flags.0 as i64,
1307                    d.addr.clone(),
1308                    d.auth_key.map(|k| k.to_vec()),
1309                    d.first_salt,
1310                    d.time_offset,
1311                ],
1312            )
1313            .await?;
1314        }
1315
1316        let us = &s.updates_state;
1317        conn.execute(
1318            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1319             ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1320             date=excluded.date,seq=excluded.seq",
1321            libsql::params![us.pts, us.qts, us.date, us.seq],
1322        )
1323        .await?;
1324
1325        conn.execute("DELETE FROM channel_pts", ()).await?;
1326        for &(cid, cpts) in &us.channels {
1327            conn.execute(
1328                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1,?2)",
1329                libsql::params![cid, cpts],
1330            )
1331            .await?;
1332        }
1333
1334        conn.execute("DELETE FROM peers", ()).await?;
1335        for p in &s.peers {
1336            conn.execute(
1337                "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1,?2,?3)",
1338                libsql::params![p.id, p.access_hash, p.is_channel as i32],
1339            )
1340            .await?;
1341        }
1342
1343        conn.execute_batch("COMMIT").await
1344    }
1345}
1346
1347#[cfg(feature = "libsql-session")]
1348impl SessionBackend for LibSqlBackend {
1349    fn save(&self, session: &PersistedSession) -> io::Result<()> {
1350        let conn = self.conn.clone();
1351        let session = session.clone();
1352        Self::block(async move {
1353            let conn = conn.lock().await;
1354            Self::write_session_async(&conn, session).await
1355        })
1356    }
1357
1358    fn load(&self) -> io::Result<Option<PersistedSession>> {
1359        let conn = self.conn.clone();
1360        let count: i64 = Self::block(async move {
1361            let conn = conn.lock().await;
1362            let mut rows = conn.query("SELECT COUNT(*) FROM meta", ()).await?;
1363            Ok::<i64, libsql::Error>(rows.next().await?.and_then(|r| r.get(0).ok()).unwrap_or(0))
1364        })?;
1365        if count == 0 {
1366            return Ok(None);
1367        }
1368        let conn = self.conn.clone();
1369        Self::block(async move {
1370            let conn = conn.lock().await;
1371            Self::read_session_async(&conn).await
1372        })
1373        .map(Some)
1374    }
1375
1376    fn delete(&self) -> io::Result<()> {
1377        let conn = self.conn.clone();
1378        Self::block(async move {
1379            let conn = conn.lock().await;
1380            conn.execute_batch(
1381                "BEGIN IMMEDIATE;
1382                 DELETE FROM meta;
1383                 DELETE FROM dcs;
1384                 DELETE FROM update_state;
1385                 DELETE FROM channel_pts;
1386                 DELETE FROM peers;
1387                 COMMIT;",
1388            )
1389            .await
1390        })
1391    }
1392
1393    fn name(&self) -> &str {
1394        &self.label
1395    }
1396
1397    // ── Granular overrides ───────────────────────────────────────────────────
1398
1399    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
1400        let conn = self.conn.clone();
1401        let (dc_id, addr, key, salt, off, flags) = (
1402            entry.dc_id,
1403            entry.addr.clone(),
1404            entry.auth_key.map(|k| k.to_vec()),
1405            entry.first_salt,
1406            entry.time_offset,
1407            entry.flags.0 as i64,
1408        );
1409        Self::block(async move {
1410            let conn = conn.lock().await;
1411            conn.execute(
1412                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1413                 VALUES (?1,?6,?2,?3,?4,?5)
1414                 ON CONFLICT(dc_id, flags) DO UPDATE SET
1415                   addr=excluded.addr, auth_key=excluded.auth_key,
1416                   first_salt=excluded.first_salt, time_offset=excluded.time_offset",
1417                libsql::params![dc_id, addr, key, salt, off, flags],
1418            )
1419            .await
1420            .map(|_| ())
1421        })
1422    }
1423
1424    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
1425        let conn = self.conn.clone();
1426        Self::block(async move {
1427            let conn = conn.lock().await;
1428            conn.execute(
1429                "INSERT INTO meta (key, value) VALUES ('home_dc_id',?1)
1430                 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
1431                libsql::params![dc_id],
1432            )
1433            .await
1434            .map(|_| ())
1435        })
1436    }
1437
1438    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
1439        let conn = self.conn.clone();
1440        Self::block(async move {
1441            let conn = conn.lock().await;
1442            match update {
1443                UpdateStateChange::All(snap) => {
1444                    conn.execute(
1445                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,?2,?3,?4)
1446                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1447                         date=excluded.date,seq=excluded.seq",
1448                        libsql::params![snap.pts, snap.qts, snap.date, snap.seq],
1449                    )
1450                    .await?;
1451                    conn.execute("DELETE FROM channel_pts", ()).await?;
1452                    for &(cid, cpts) in &snap.channels {
1453                        conn.execute(
1454                            "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)",
1455                            libsql::params![cid, cpts],
1456                        )
1457                        .await?;
1458                    }
1459                    Ok(())
1460                }
1461                UpdateStateChange::Primary { pts, date, seq } => conn
1462                    .execute(
1463                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,0,?2,?3)
1464                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,date=excluded.date,
1465                         seq=excluded.seq",
1466                        libsql::params![pts, date, seq],
1467                    )
1468                    .await
1469                    .map(|_| ()),
1470                UpdateStateChange::Secondary { qts } => conn
1471                    .execute(
1472                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,0,?1,0,0)
1473                         ON CONFLICT(id) DO UPDATE SET qts=excluded.qts",
1474                        libsql::params![qts],
1475                    )
1476                    .await
1477                    .map(|_| ()),
1478                UpdateStateChange::Channel { id, pts } => conn
1479                    .execute(
1480                        "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)
1481                         ON CONFLICT(channel_id) DO UPDATE SET pts=excluded.pts",
1482                        libsql::params![id, pts],
1483                    )
1484                    .await
1485                    .map(|_| ()),
1486            }
1487        })
1488    }
1489
1490    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1491        let conn = self.conn.clone();
1492        let (id, hash, is_ch) = (peer.id, peer.access_hash, peer.is_channel as i32);
1493        Self::block(async move {
1494            let conn = conn.lock().await;
1495            conn.execute(
1496                "INSERT INTO peers (id,access_hash,is_channel) VALUES (?1,?2,?3)
1497                 ON CONFLICT(id) DO UPDATE SET
1498                   access_hash=excluded.access_hash,
1499                   is_channel=excluded.is_channel",
1500                libsql::params![id, hash, is_ch],
1501            )
1502            .await
1503            .map(|_| ())
1504        })
1505    }
1506}