Skip to main content

layer_client/
session_backend.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// NOTE:
5// The "Layer" project is no longer maintained or supported.
6// Its original purpose for personal SDK/APK experimentation and learning
7// has been fulfilled.
8//
9// Please use Ferogram instead:
10// https://github.com/ankit-chaubey/ferogram
11// Ferogram will receive future updates and development, although progress
12// may be slower.
13//
14// Ferogram is an async Telegram MTProto client library written in Rust.
15// Its implementation follows the behaviour of the official Telegram clients,
16// particularly Telegram Desktop and TDLib, and aims to provide a clean and
17// modern async interface for building Telegram clients and tools.
18
19//! Pluggable session storage backend.
20//!
21//! # What changed vs the original
22//!
23//! | Before | After |
24//! |---|---|
25//! | Only `save(PersistedSession)` + `load()`: full round-trip for every change | New `update_dc`, `set_home_dc`, `update_state` allow granular writes |
26//! | All methods sync (`io::Result`) | New methods are `async` (optional; default impls fall back to save/load) |
27//! | No way to update a single DC key without touching everything else | `update_dc` only rewrites what changed |
28//!
29//! # Backward compatibility
30//!
31//! The existing `SessionBackend` trait is unchanged. The new methods have
32//! default implementations that call `load` → mutate → `save`, so existing
33//! backends (`BinaryFileBackend`, `InMemoryBackend`, `SqliteBackend`,
34//! `LibSqlBackend`) continue to compile and work without modification.
35//!
36//! High-performance backends (e.g. a future Redis backend) can override the
37//! granular methods to avoid the load/save round-trip.
38//!
39//! # Ported from
40//!
41//! ' `Session` trait (in `-session/src/session.rs`) exposes:
42//! - `home_dc_id() -> i32`                           : cheap sync read
43//! - `set_home_dc_id(dc_id) -> BoxFuture<'_, ()>`    : async write
44//! - `dc_option(dc_id) -> Option<DcOption>`          : cheap sync read
45//! - `set_dc_option(&DcOption) -> BoxFuture<'_, ()>` : async write
46//! - `updates_state() -> BoxFuture<UpdatesState>`     : async read
47//! - `set_update_state(UpdateState) -> BoxFuture<()>`: fine-grained async write
48//!
49//! We adopt the same pattern while keeping layer's `PersistedSession` struct.
50
51use std::io;
52use std::path::PathBuf;
53
54use crate::session::{CachedPeer, DcEntry, PersistedSession, UpdatesStateSnap};
55
56// Core trait (unchanged)
57
58/// Synchronous snapshot backend: saves and loads the full session at once.
59///
60/// All built-in backends implement this. Higher-level code should prefer the
61/// extension methods below (`update_dc`, `set_home_dc`, `update_state`) which
62/// avoid unnecessary full-snapshot writes.
63pub trait SessionBackend: Send + Sync {
64    fn save(&self, session: &PersistedSession) -> io::Result<()>;
65    fn load(&self) -> io::Result<Option<PersistedSession>>;
66    fn delete(&self) -> io::Result<()>;
67
68    /// Human-readable name for logging/debug output.
69    fn name(&self) -> &str;
70
71    // Granular helpers (default: load → mutate → save)
72    //
73    // These default implementations are correct but not optimal.
74    // Backends that store data in a database (SQLite, libsql, Redis) should
75    // override them to issue single-row UPDATE statements instead.
76
77    /// Update a single DC entry without rewriting the entire session.
78    ///
79    /// Typically called after:
80    /// - completing a DH handshake on a new DC (to persist its auth key)
81    /// - receiving updated DC addresses from `help.getConfig`
82    ///
83    /// Ported from  `Session::set_dc_option`.
84    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
85        let mut s = self.load()?.unwrap_or_default();
86        // Replace existing entry or append
87        if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
88            *existing = entry.clone();
89        } else {
90            s.dcs.push(entry.clone());
91        }
92        self.save(&s)
93    }
94
95    /// Change the home DC without touching any other session data.
96    ///
97    /// Called after a successful `*_MIGRATE` redirect: the user's account
98    /// now lives on a different DC.
99    ///
100    /// Ported from  `Session::set_home_dc_id`.
101    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
102        let mut s = self.load()?.unwrap_or_default();
103        s.home_dc_id = dc_id;
104        self.save(&s)
105    }
106
107    /// Apply a single update-sequence change without a full save/load.
108    ///
109    /// Ported from  `Session::set_update_state(UpdateState)`.
110    ///
111    /// `update` is the new partial or full state to merge in.
112    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
113        let mut s = self.load()?.unwrap_or_default();
114        update.apply_to(&mut s.updates_state);
115        self.save(&s)
116    }
117
118    /// Cache a peer access hash without a full session save.
119    ///
120    /// This is lossy-on-default (full round-trip) but correct.
121    /// Override in SQL backends to issue a single `INSERT OR REPLACE`.
122    ///
123    /// Ported from  `Session::cache_peer`.
124    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
125        let mut s = self.load()?.unwrap_or_default();
126        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
127            *existing = peer.clone();
128        } else {
129            s.peers.push(peer.clone());
130        }
131        self.save(&s)
132    }
133}
134
135// UpdateStateChange (mirrors  UpdateState enum)
136
137/// A single update-sequence change, applied via [`SessionBackend::apply_update_state`].
138///
139///uses:
140/// ```text
141/// UpdateState::All(updates_state)
142/// UpdateState::Primary { pts, date, seq }
143/// UpdateState::Secondary { qts }
144/// UpdateState::Channel { id, pts }
145/// ```
146///
147/// We map this 1-to-1 to layer's `UpdatesStateSnap`.
148#[derive(Debug, Clone)]
149pub enum UpdateStateChange {
150    /// Replace the entire state snapshot.
151    All(UpdatesStateSnap),
152    /// Update main sequence counters only (non-channel).
153    Primary { pts: i32, date: i32, seq: i32 },
154    /// Update the QTS counter (secret chats).
155    Secondary { qts: i32 },
156    /// Update the PTS for a specific channel.
157    Channel { id: i64, pts: i32 },
158}
159
160impl UpdateStateChange {
161    /// Apply `self` to `snap` in-place.
162    pub fn apply_to(&self, snap: &mut UpdatesStateSnap) {
163        match self {
164            Self::All(new_snap) => *snap = new_snap.clone(),
165            Self::Primary { pts, date, seq } => {
166                snap.pts = *pts;
167                snap.date = *date;
168                snap.seq = *seq;
169            }
170            Self::Secondary { qts } => {
171                snap.qts = *qts;
172            }
173            Self::Channel { id, pts } => {
174                // Replace or insert per-channel pts
175                if let Some(existing) = snap.channels.iter_mut().find(|c| c.0 == *id) {
176                    existing.1 = *pts;
177                } else {
178                    snap.channels.push((*id, *pts));
179                }
180            }
181        }
182    }
183}
184
185// BinaryFileBackend
186
187/// Stores the session in a compact binary file (v2 format).
188pub struct BinaryFileBackend {
189    path: PathBuf,
190}
191
192impl BinaryFileBackend {
193    pub fn new(path: impl Into<PathBuf>) -> Self {
194        Self { path: path.into() }
195    }
196
197    pub fn path(&self) -> &std::path::Path {
198        &self.path
199    }
200}
201
202impl SessionBackend for BinaryFileBackend {
203    fn save(&self, session: &PersistedSession) -> io::Result<()> {
204        session.save(&self.path)
205    }
206
207    fn load(&self) -> io::Result<Option<PersistedSession>> {
208        if !self.path.exists() {
209            return Ok(None);
210        }
211        match PersistedSession::load(&self.path) {
212            Ok(s) => Ok(Some(s)),
213            Err(e) => {
214                let bak = self.path.with_extension("bak");
215                tracing::warn!(
216                    "[layer] Session file {:?} is corrupt ({e}); \
217                     renaming to {:?} and starting fresh",
218                    self.path,
219                    bak
220                );
221                let _ = std::fs::rename(&self.path, &bak);
222                Ok(None)
223            }
224        }
225    }
226
227    fn delete(&self) -> io::Result<()> {
228        if self.path.exists() {
229            std::fs::remove_file(&self.path)?;
230        }
231        Ok(())
232    }
233
234    fn name(&self) -> &str {
235        "binary-file"
236    }
237
238    // BinaryFileBackend: the default granular impls (load→mutate→save) are
239    // fine since the format is a single compact binary blob. No override needed.
240}
241
242// InMemoryBackend
243
244/// Ephemeral in-process session: nothing persisted to disk.
245///
246/// Override the granular methods to skip the clone overhead of the full
247/// snapshot path (we're already in memory, so direct field mutations are
248/// cheaper than clone→mutate→replace).
249#[derive(Default)]
250pub struct InMemoryBackend {
251    data: std::sync::Mutex<Option<PersistedSession>>,
252}
253
254impl InMemoryBackend {
255    pub fn new() -> Self {
256        Self::default()
257    }
258
259    /// Test helper: get a snapshot of the current in-memory state.
260    pub fn snapshot(&self) -> Option<PersistedSession> {
261        self.data.lock().unwrap().clone()
262    }
263}
264
265impl SessionBackend for InMemoryBackend {
266    fn save(&self, s: &PersistedSession) -> io::Result<()> {
267        *self.data.lock().unwrap() = Some(s.clone());
268        Ok(())
269    }
270
271    fn load(&self) -> io::Result<Option<PersistedSession>> {
272        Ok(self.data.lock().unwrap().clone())
273    }
274
275    fn delete(&self) -> io::Result<()> {
276        *self.data.lock().unwrap() = None;
277        Ok(())
278    }
279
280    fn name(&self) -> &str {
281        "in-memory"
282    }
283
284    // Granular overrides: cheaper than load→clone→save
285
286    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
287        let mut guard = self.data.lock().unwrap();
288        let s = guard.get_or_insert_with(PersistedSession::default);
289        if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
290            *existing = entry.clone();
291        } else {
292            s.dcs.push(entry.clone());
293        }
294        Ok(())
295    }
296
297    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
298        let mut guard = self.data.lock().unwrap();
299        let s = guard.get_or_insert_with(PersistedSession::default);
300        s.home_dc_id = dc_id;
301        Ok(())
302    }
303
304    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
305        let mut guard = self.data.lock().unwrap();
306        let s = guard.get_or_insert_with(PersistedSession::default);
307        update.apply_to(&mut s.updates_state);
308        Ok(())
309    }
310
311    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
312        let mut guard = self.data.lock().unwrap();
313        let s = guard.get_or_insert_with(PersistedSession::default);
314        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
315            *existing = peer.clone();
316        } else {
317            s.peers.push(peer.clone());
318        }
319        Ok(())
320    }
321}
322
323// StringSessionBackend
324
325/// Portable base64 string session backend.
326pub struct StringSessionBackend {
327    data: std::sync::Mutex<String>,
328}
329
330impl StringSessionBackend {
331    pub fn new(s: impl Into<String>) -> Self {
332        Self {
333            data: std::sync::Mutex::new(s.into()),
334        }
335    }
336
337    pub fn current(&self) -> String {
338        self.data.lock().unwrap().clone()
339    }
340}
341
342impl SessionBackend for StringSessionBackend {
343    fn save(&self, session: &PersistedSession) -> io::Result<()> {
344        *self.data.lock().unwrap() = session.to_string();
345        Ok(())
346    }
347
348    fn load(&self) -> io::Result<Option<PersistedSession>> {
349        let s = self.data.lock().unwrap().clone();
350        if s.trim().is_empty() {
351            return Ok(None);
352        }
353        PersistedSession::from_string(&s).map(Some)
354    }
355
356    fn delete(&self) -> io::Result<()> {
357        *self.data.lock().unwrap() = String::new();
358        Ok(())
359    }
360
361    fn name(&self) -> &str {
362        "string-session"
363    }
364}
365
366// Tests
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    fn make_dc(id: i32) -> DcEntry {
373        DcEntry {
374            dc_id: id,
375            addr: format!("1.2.3.{id}:443"),
376            auth_key: None,
377            first_salt: 0,
378            time_offset: 0,
379            flags: DcFlags::NONE,
380        }
381    }
382
383    fn make_peer(id: i64, hash: i64) -> CachedPeer {
384        CachedPeer {
385            id,
386            access_hash: hash,
387            is_channel: false,
388        }
389    }
390
391    // InMemoryBackend: basic save/load
392
393    #[test]
394    fn inmemory_load_returns_none_when_empty() {
395        let b = InMemoryBackend::new();
396        assert!(b.load().unwrap().is_none());
397    }
398
399    #[test]
400    fn inmemory_save_then_load_round_trips() {
401        let b = InMemoryBackend::new();
402        let mut s = PersistedSession::default();
403        s.home_dc_id = 3;
404        s.dcs.push(make_dc(3));
405        b.save(&s).unwrap();
406
407        let loaded = b.load().unwrap().unwrap();
408        assert_eq!(loaded.home_dc_id, 3);
409        assert_eq!(loaded.dcs.len(), 1);
410    }
411
412    #[test]
413    fn inmemory_delete_clears_state() {
414        let b = InMemoryBackend::new();
415        let mut s = PersistedSession::default();
416        s.home_dc_id = 2;
417        b.save(&s).unwrap();
418        b.delete().unwrap();
419        assert!(b.load().unwrap().is_none());
420    }
421
422    // InMemoryBackend: granular methods
423
424    #[test]
425    fn inmemory_update_dc_inserts_new() {
426        let b = InMemoryBackend::new();
427        b.update_dc(&make_dc(4)).unwrap();
428        let s = b.snapshot().unwrap();
429        assert_eq!(s.dcs.len(), 1);
430        assert_eq!(s.dcs[0].dc_id, 4);
431    }
432
433    #[test]
434    fn inmemory_update_dc_replaces_existing() {
435        let b = InMemoryBackend::new();
436        b.update_dc(&make_dc(2)).unwrap();
437        let mut updated = make_dc(2);
438        updated.addr = "9.9.9.9:443".to_string();
439        b.update_dc(&updated).unwrap();
440
441        let s = b.snapshot().unwrap();
442        assert_eq!(s.dcs.len(), 1);
443        assert_eq!(s.dcs[0].addr, "9.9.9.9:443");
444    }
445
446    #[test]
447    fn inmemory_set_home_dc() {
448        let b = InMemoryBackend::new();
449        b.set_home_dc(5).unwrap();
450        assert_eq!(b.snapshot().unwrap().home_dc_id, 5);
451    }
452
453    #[test]
454    fn inmemory_cache_peer_inserts() {
455        let b = InMemoryBackend::new();
456        b.cache_peer(&make_peer(100, 0xdeadbeef)).unwrap();
457        let s = b.snapshot().unwrap();
458        assert_eq!(s.peers.len(), 1);
459        assert_eq!(s.peers[0].id, 100);
460    }
461
462    #[test]
463    fn inmemory_cache_peer_updates_existing() {
464        let b = InMemoryBackend::new();
465        b.cache_peer(&make_peer(100, 111)).unwrap();
466        b.cache_peer(&make_peer(100, 222)).unwrap();
467        let s = b.snapshot().unwrap();
468        assert_eq!(s.peers.len(), 1);
469        assert_eq!(s.peers[0].access_hash, 222);
470    }
471
472    // UpdateStateChange
473
474    #[test]
475    fn update_state_primary() {
476        let mut snap = UpdatesStateSnap {
477            pts: 0,
478            qts: 0,
479            date: 0,
480            seq: 0,
481            channels: vec![],
482        };
483        UpdateStateChange::Primary {
484            pts: 10,
485            date: 20,
486            seq: 30,
487        }
488        .apply_to(&mut snap);
489        assert_eq!(snap.pts, 10);
490        assert_eq!(snap.date, 20);
491        assert_eq!(snap.seq, 30);
492        assert_eq!(snap.qts, 0); // untouched
493    }
494
495    #[test]
496    fn update_state_secondary() {
497        let mut snap = UpdatesStateSnap {
498            pts: 5,
499            qts: 0,
500            date: 0,
501            seq: 0,
502            channels: vec![],
503        };
504        UpdateStateChange::Secondary { qts: 99 }.apply_to(&mut snap);
505        assert_eq!(snap.qts, 99);
506        assert_eq!(snap.pts, 5); // untouched
507    }
508
509    #[test]
510    fn update_state_channel_inserts() {
511        let mut snap = UpdatesStateSnap {
512            pts: 0,
513            qts: 0,
514            date: 0,
515            seq: 0,
516            channels: vec![],
517        };
518        UpdateStateChange::Channel { id: 12345, pts: 42 }.apply_to(&mut snap);
519        assert_eq!(snap.channels, vec![(12345, 42)]);
520    }
521
522    #[test]
523    fn update_state_channel_updates_existing() {
524        let mut snap = UpdatesStateSnap {
525            pts: 0,
526            qts: 0,
527            date: 0,
528            seq: 0,
529            channels: vec![(12345, 10), (67890, 5)],
530        };
531        UpdateStateChange::Channel { id: 12345, pts: 99 }.apply_to(&mut snap);
532        // First channel updated, second untouched
533        assert_eq!(snap.channels[0], (12345, 99));
534        assert_eq!(snap.channels[1], (67890, 5));
535    }
536
537    #[test]
538    fn apply_update_state_via_backend() {
539        let b = InMemoryBackend::new();
540        b.apply_update_state(UpdateStateChange::Primary {
541            pts: 7,
542            date: 8,
543            seq: 9,
544        })
545        .unwrap();
546        let s = b.snapshot().unwrap();
547        assert_eq!(s.updates_state.pts, 7);
548    }
549
550    // Default impl (BinaryFileBackend trait shape via InMemory smoke)
551
552    #[test]
553    fn default_update_dc_via_trait_object() {
554        let b: Box<dyn SessionBackend> = Box::new(InMemoryBackend::new());
555        b.update_dc(&make_dc(1)).unwrap();
556        b.update_dc(&make_dc(2)).unwrap();
557        // Can't call snapshot() on trait object, but save/load must be consistent
558        let loaded = b.load().unwrap().unwrap();
559        assert_eq!(loaded.dcs.len(), 2);
560    }
561
562    // IPv6 tests
563
564    fn make_dc_v6(id: i32) -> DcEntry {
565        DcEntry {
566            dc_id: id,
567            addr: format!("[2001:b28:f23d:f00{}::a]:443", id),
568            auth_key: None,
569            first_salt: 0,
570            time_offset: 0,
571            flags: DcFlags::IPV6,
572        }
573    }
574
575    #[test]
576    fn dc_entry_from_parts_ipv4() {
577        let dc = DcEntry::from_parts(1, "149.154.175.53", 443, DcFlags::NONE);
578        assert_eq!(dc.addr, "149.154.175.53:443");
579        assert!(!dc.is_ipv6());
580        let sa = dc.socket_addr().unwrap();
581        assert_eq!(sa.port(), 443);
582    }
583
584    #[test]
585    fn dc_entry_from_parts_ipv6() {
586        let dc = DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6);
587        assert_eq!(dc.addr, "[2001:b28:f23d:f001::a]:443");
588        assert!(dc.is_ipv6());
589        let sa = dc.socket_addr().unwrap();
590        assert_eq!(sa.port(), 443);
591    }
592
593    #[test]
594    fn persisted_session_dc_for_prefers_ipv6() {
595        let mut s = PersistedSession::default();
596        s.dcs.push(make_dc(2)); // IPv4
597        s.dcs.push(make_dc_v6(2)); // IPv6
598
599        let v6 = s.dc_for(2, true).unwrap();
600        assert!(v6.is_ipv6());
601
602        let v4 = s.dc_for(2, false).unwrap();
603        assert!(!v4.is_ipv6());
604    }
605
606    #[test]
607    fn persisted_session_dc_for_falls_back_when_only_ipv4() {
608        let mut s = PersistedSession::default();
609        s.dcs.push(make_dc(3)); // IPv4 only
610
611        // Asking for IPv6 should fall back to IPv4
612        let dc = s.dc_for(3, true).unwrap();
613        assert!(!dc.is_ipv6());
614    }
615
616    #[test]
617    fn persisted_session_all_dcs_for_returns_both() {
618        let mut s = PersistedSession::default();
619        s.dcs.push(make_dc(1));
620        s.dcs.push(make_dc_v6(1));
621        s.dcs.push(make_dc(2));
622
623        assert_eq!(s.all_dcs_for(1).count(), 2);
624        assert_eq!(s.all_dcs_for(2).count(), 1);
625        assert_eq!(s.all_dcs_for(5).count(), 0);
626    }
627
628    #[test]
629    fn inmemory_ipv4_and_ipv6_coexist() {
630        let b = InMemoryBackend::new();
631        b.update_dc(&make_dc(2)).unwrap(); // IPv4
632        b.update_dc(&make_dc_v6(2)).unwrap(); // IPv6
633
634        let s = b.snapshot().unwrap();
635        // Both entries must survive they have different flags
636        assert_eq!(s.dcs.iter().filter(|d| d.dc_id == 2).count(), 2);
637    }
638
639    #[test]
640    fn binary_roundtrip_ipv4_and_ipv6() {
641        let mut s = PersistedSession::default();
642        s.home_dc_id = 2;
643        s.dcs.push(make_dc(2));
644        s.dcs.push(make_dc_v6(2));
645
646        let bytes = s.to_bytes();
647        let loaded = PersistedSession::from_bytes(&bytes).unwrap();
648        assert_eq!(loaded.dcs.len(), 2);
649        assert_eq!(loaded.dcs.iter().filter(|d| d.is_ipv6()).count(), 1);
650        assert_eq!(loaded.dcs.iter().filter(|d| !d.is_ipv6()).count(), 1);
651    }
652}
653
654// ─── SqliteBackend ────────────────────────────────────────────────────────────
655
656/// SQLite-backed session (via `rusqlite`).
657///
658/// Enabled with the `sqlite-session` Cargo feature.
659///
660/// # Schema
661///
662/// Five tables are created on first open (idempotent):
663///
664/// | Table          | Purpose                                          |
665/// |----------------|--------------------------------------------------|
666/// | `meta`         | `home_dc_id` and future scalar values            |
667/// | `dcs`          | One row per DC (auth key, address, flags, …)     |
668/// | `update_state` | Single-row pts / qts / date / seq                |
669/// | `channel_pts`  | Per-channel pts                                  |
670/// | `peers`        | Access-hash cache                                |
671///
672/// # Granular writes
673///
674/// All [`SessionBackend`] extension methods (`update_dc`, `set_home_dc`,
675/// `apply_update_state`, `cache_peer`) issue **single-row SQL statements**
676/// instead of the default load-mutate-save round-trip, so they are safe to
677/// call frequently (e.g. on every update batch) without performance concerns.
678#[cfg(feature = "sqlite-session")]
679pub struct SqliteBackend {
680    conn: std::sync::Mutex<rusqlite::Connection>,
681    label: String,
682}
683
684#[cfg(feature = "sqlite-session")]
685impl SqliteBackend {
686    const SCHEMA: &'static str = "
687        PRAGMA journal_mode = WAL;
688        PRAGMA synchronous  = NORMAL;
689
690        CREATE TABLE IF NOT EXISTS meta (
691            key   TEXT    PRIMARY KEY,
692            value INTEGER NOT NULL DEFAULT 0
693        );
694
695        CREATE TABLE IF NOT EXISTS dcs (
696            dc_id       INTEGER NOT NULL,
697            flags       INTEGER NOT NULL DEFAULT 0,
698            addr        TEXT    NOT NULL,
699            auth_key    BLOB,
700            first_salt  INTEGER NOT NULL DEFAULT 0,
701            time_offset INTEGER NOT NULL DEFAULT 0,
702            PRIMARY KEY (dc_id, flags)
703        );
704
705        CREATE TABLE IF NOT EXISTS update_state (
706            id   INTEGER PRIMARY KEY CHECK (id = 1),
707            pts  INTEGER NOT NULL DEFAULT 0,
708            qts  INTEGER NOT NULL DEFAULT 0,
709            date INTEGER NOT NULL DEFAULT 0,
710            seq  INTEGER NOT NULL DEFAULT 0
711        );
712
713        CREATE TABLE IF NOT EXISTS channel_pts (
714            channel_id INTEGER PRIMARY KEY,
715            pts        INTEGER NOT NULL
716        );
717
718        CREATE TABLE IF NOT EXISTS peers (
719            id           INTEGER PRIMARY KEY,
720            access_hash  INTEGER NOT NULL,
721            is_channel   INTEGER NOT NULL DEFAULT 0
722        );
723    ";
724
725    /// Open (or create) the SQLite database at `path`.
726    pub fn open(path: impl Into<PathBuf>) -> io::Result<Self> {
727        let path = path.into();
728        let label = path.display().to_string();
729        let conn = rusqlite::Connection::open(&path)
730            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
731        conn.execute_batch(Self::SCHEMA)
732            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
733        Ok(Self {
734            conn: std::sync::Mutex::new(conn),
735            label,
736        })
737    }
738
739    /// Open an in-process SQLite database (useful for tests).
740    pub fn in_memory() -> io::Result<Self> {
741        let conn = rusqlite::Connection::open_in_memory()
742            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
743        conn.execute_batch(Self::SCHEMA)
744            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
745        Ok(Self {
746            conn: std::sync::Mutex::new(conn),
747            label: ":memory:".into(),
748        })
749    }
750
751    fn map_err(e: rusqlite::Error) -> io::Error {
752        io::Error::new(io::ErrorKind::Other, e)
753    }
754
755    /// Read the full session out of the database.
756    fn read_session(conn: &rusqlite::Connection) -> io::Result<PersistedSession> {
757        // home_dc_id
758        let home_dc_id: i32 = conn
759            .query_row("SELECT value FROM meta WHERE key = 'home_dc_id'", [], |r| {
760                r.get(0)
761            })
762            .unwrap_or(0);
763
764        // dcs
765        let mut stmt = conn
766            .prepare("SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs")
767            .map_err(Self::map_err)?;
768        let dcs = stmt
769            .query_map([], |row| {
770                let dc_id: i32 = row.get(0)?;
771                let flags_raw: u8 = row.get(1)?;
772                let addr: String = row.get(2)?;
773                let key_blob: Option<Vec<u8>> = row.get(3)?;
774                let first_salt: i64 = row.get(4)?;
775                let time_offset: i32 = row.get(5)?;
776                Ok((dc_id, addr, key_blob, first_salt, time_offset, flags_raw))
777            })
778            .map_err(Self::map_err)?
779            .filter_map(|r| r.ok())
780            .map(
781                |(dc_id, addr, key_blob, first_salt, time_offset, flags_raw)| {
782                    let auth_key = key_blob.and_then(|b| {
783                        if b.len() == 256 {
784                            let mut k = [0u8; 256];
785                            k.copy_from_slice(&b);
786                            Some(k)
787                        } else {
788                            None
789                        }
790                    });
791                    DcEntry {
792                        dc_id,
793                        addr,
794                        auth_key,
795                        first_salt,
796                        time_offset,
797                        flags: DcFlags(flags_raw),
798                    }
799                },
800            )
801            .collect();
802
803        // update_state
804        let updates_state = conn
805            .query_row(
806                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
807                [],
808                |r| {
809                    Ok(UpdatesStateSnap {
810                        pts: r.get(0)?,
811                        qts: r.get(1)?,
812                        date: r.get(2)?,
813                        seq: r.get(3)?,
814                        channels: vec![],
815                    })
816                },
817            )
818            .unwrap_or_default();
819
820        // channel_pts
821        let mut ch_stmt = conn
822            .prepare("SELECT channel_id, pts FROM channel_pts")
823            .map_err(Self::map_err)?;
824        let channels: Vec<(i64, i32)> = ch_stmt
825            .query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i32>(1)?)))
826            .map_err(Self::map_err)?
827            .filter_map(|r| r.ok())
828            .collect();
829
830        // peers
831        let mut peer_stmt = conn
832            .prepare("SELECT id, access_hash, is_channel FROM peers")
833            .map_err(Self::map_err)?;
834        let peers: Vec<CachedPeer> = peer_stmt
835            .query_map([], |r| {
836                Ok(CachedPeer {
837                    id: r.get(0)?,
838                    access_hash: r.get(1)?,
839                    is_channel: r.get::<_, i32>(2)? != 0,
840                })
841            })
842            .map_err(Self::map_err)?
843            .filter_map(|r| r.ok())
844            .collect();
845
846        Ok(PersistedSession {
847            home_dc_id,
848            dcs,
849            updates_state: UpdatesStateSnap {
850                channels,
851                ..updates_state
852            },
853            peers,
854        })
855    }
856
857    /// Write the full session into the database inside a single transaction.
858    fn write_session(conn: &rusqlite::Connection, s: &PersistedSession) -> io::Result<()> {
859        conn.execute_batch("BEGIN IMMEDIATE")
860            .map_err(Self::map_err)?;
861
862        conn.execute(
863            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
864             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
865            rusqlite::params![s.home_dc_id],
866        )
867        .map_err(Self::map_err)?;
868
869        // Replace all DCs
870        conn.execute("DELETE FROM dcs", []).map_err(Self::map_err)?;
871        for d in &s.dcs {
872            conn.execute(
873                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
874                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
875                rusqlite::params![
876                    d.dc_id,
877                    d.flags.0,
878                    d.addr,
879                    d.auth_key.as_ref().map(|k| k.as_ref()),
880                    d.first_salt,
881                    d.time_offset,
882                ],
883            )
884            .map_err(Self::map_err)?;
885        }
886
887        // update_state
888        let us = &s.updates_state;
889        conn.execute(
890            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1, ?1, ?2, ?3, ?4)
891             ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, qts=excluded.qts,
892             date=excluded.date, seq=excluded.seq",
893            rusqlite::params![us.pts, us.qts, us.date, us.seq],
894        )
895        .map_err(Self::map_err)?;
896
897        conn.execute("DELETE FROM channel_pts", [])
898            .map_err(Self::map_err)?;
899        for &(cid, cpts) in &us.channels {
900            conn.execute(
901                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
902                rusqlite::params![cid, cpts],
903            )
904            .map_err(Self::map_err)?;
905        }
906
907        // peers
908        conn.execute("DELETE FROM peers", [])
909            .map_err(Self::map_err)?;
910        for p in &s.peers {
911            conn.execute(
912                "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)",
913                rusqlite::params![p.id, p.access_hash, p.is_channel as i32],
914            )
915            .map_err(Self::map_err)?;
916        }
917
918        conn.execute_batch("COMMIT").map_err(Self::map_err)
919    }
920}
921
922#[cfg(feature = "sqlite-session")]
923impl SessionBackend for SqliteBackend {
924    fn save(&self, session: &PersistedSession) -> io::Result<()> {
925        let conn = self.conn.lock().unwrap();
926        Self::write_session(&conn, session)
927    }
928
929    fn load(&self) -> io::Result<Option<PersistedSession>> {
930        let conn = self.conn.lock().unwrap();
931        // If meta table is empty, no session has been saved yet.
932        let count: i64 = conn
933            .query_row("SELECT COUNT(*) FROM meta", [], |r| r.get(0))
934            .map_err(Self::map_err)?;
935        if count == 0 {
936            return Ok(None);
937        }
938        Self::read_session(&conn).map(Some)
939    }
940
941    fn delete(&self) -> io::Result<()> {
942        let conn = self.conn.lock().unwrap();
943        conn.execute_batch(
944            "BEGIN IMMEDIATE;
945             DELETE FROM meta;
946             DELETE FROM dcs;
947             DELETE FROM update_state;
948             DELETE FROM channel_pts;
949             DELETE FROM peers;
950             COMMIT;",
951        )
952        .map_err(Self::map_err)
953    }
954
955    fn name(&self) -> &str {
956        &self.label
957    }
958
959    // ── Granular overrides (single-row SQL, no full round-trip) ──────────────
960
961    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
962        let conn = self.conn.lock().unwrap();
963        conn.execute(
964            "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
965             VALUES (?1, ?6, ?2, ?3, ?4, ?5)
966             ON CONFLICT(dc_id, flags) DO UPDATE SET
967               addr        = excluded.addr,
968               auth_key    = excluded.auth_key,
969               first_salt  = excluded.first_salt,
970               time_offset = excluded.time_offset",
971            rusqlite::params![
972                entry.dc_id,
973                entry.addr,
974                entry.auth_key.as_ref().map(|k| k.as_ref()),
975                entry.first_salt,
976                entry.time_offset,
977                entry.flags.0,
978            ],
979        )
980        .map(|_| ())
981        .map_err(Self::map_err)
982    }
983
984    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
985        let conn = self.conn.lock().unwrap();
986        conn.execute(
987            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
988             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
989            rusqlite::params![dc_id],
990        )
991        .map(|_| ())
992        .map_err(Self::map_err)
993    }
994
995    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
996        let conn = self.conn.lock().unwrap();
997        match update {
998            UpdateStateChange::All(snap) => {
999                conn.execute(
1000                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1001                     ON CONFLICT(id) DO UPDATE SET
1002                       pts=excluded.pts, qts=excluded.qts,
1003                       date=excluded.date, seq=excluded.seq",
1004                    rusqlite::params![snap.pts, snap.qts, snap.date, snap.seq],
1005                )
1006                .map_err(Self::map_err)?;
1007                conn.execute("DELETE FROM channel_pts", [])
1008                    .map_err(Self::map_err)?;
1009                for &(cid, cpts) in &snap.channels {
1010                    conn.execute(
1011                        "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
1012                        rusqlite::params![cid, cpts],
1013                    )
1014                    .map_err(Self::map_err)?;
1015                }
1016                Ok(())
1017            }
1018            UpdateStateChange::Primary { pts, date, seq } => conn
1019                .execute(
1020                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,0,?2,?3)
1021                     ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, date=excluded.date,
1022                     seq=excluded.seq",
1023                    rusqlite::params![pts, date, seq],
1024                )
1025                .map(|_| ())
1026                .map_err(Self::map_err),
1027            UpdateStateChange::Secondary { qts } => conn
1028                .execute(
1029                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,0,?1,0,0)
1030                     ON CONFLICT(id) DO UPDATE SET qts = excluded.qts",
1031                    rusqlite::params![qts],
1032                )
1033                .map(|_| ())
1034                .map_err(Self::map_err),
1035            UpdateStateChange::Channel { id, pts } => conn
1036                .execute(
1037                    "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)
1038                     ON CONFLICT(channel_id) DO UPDATE SET pts = excluded.pts",
1039                    rusqlite::params![id, pts],
1040                )
1041                .map(|_| ())
1042                .map_err(Self::map_err),
1043        }
1044    }
1045
1046    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1047        let conn = self.conn.lock().unwrap();
1048        conn.execute(
1049            "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)
1050             ON CONFLICT(id) DO UPDATE SET
1051               access_hash = excluded.access_hash,
1052               is_channel  = excluded.is_channel",
1053            rusqlite::params![peer.id, peer.access_hash, peer.is_channel as i32],
1054        )
1055        .map(|_| ())
1056        .map_err(Self::map_err)
1057    }
1058}
1059
1060// ─── LibSqlBackend ────────────────────────────────────────────────────────────
1061
1062/// libSQL-backed session (Turso / embedded replica / in-process).
1063///
1064/// Enabled with the `libsql-session` Cargo feature.
1065///
1066/// The libSQL API is async; since [`SessionBackend`] methods are sync we
1067/// block via `tokio::runtime::Handle::current().block_on(…)`.  Always
1068/// call from inside a Tokio runtime (i.e. the same runtime as the rest of
1069/// `layer-client`).
1070///
1071/// # Connecting
1072///
1073/// | Mode              | Constructor                        |
1074/// |-------------------|------------------------------------|
1075/// | Local file        | `LibSqlBackend::open_local(path)`  |
1076/// | In-memory         | `LibSqlBackend::in_memory()`       |
1077/// | Turso remote      | `LibSqlBackend::open_remote(url, token)` |
1078/// | Embedded replica  | `LibSqlBackend::open_replica(path, url, token)` |
1079#[cfg(feature = "libsql-session")]
1080pub struct LibSqlBackend {
1081    conn: libsql::Connection,
1082    label: String,
1083}
1084
1085#[cfg(feature = "libsql-session")]
1086impl LibSqlBackend {
1087    const SCHEMA: &'static str = "
1088        CREATE TABLE IF NOT EXISTS meta (
1089            key   TEXT    PRIMARY KEY,
1090            value INTEGER NOT NULL DEFAULT 0
1091        );
1092        CREATE TABLE IF NOT EXISTS dcs (
1093            dc_id       INTEGER NOT NULL,
1094            flags       INTEGER NOT NULL DEFAULT 0,
1095            addr        TEXT    NOT NULL,
1096            auth_key    BLOB,
1097            first_salt  INTEGER NOT NULL DEFAULT 0,
1098            time_offset INTEGER NOT NULL DEFAULT 0,
1099            PRIMARY KEY (dc_id, flags)
1100        );
1101        CREATE TABLE IF NOT EXISTS update_state (
1102            id   INTEGER PRIMARY KEY CHECK (id = 1),
1103            pts  INTEGER NOT NULL DEFAULT 0,
1104            qts  INTEGER NOT NULL DEFAULT 0,
1105            date INTEGER NOT NULL DEFAULT 0,
1106            seq  INTEGER NOT NULL DEFAULT 0
1107        );
1108        CREATE TABLE IF NOT EXISTS channel_pts (
1109            channel_id INTEGER PRIMARY KEY,
1110            pts        INTEGER NOT NULL
1111        );
1112        CREATE TABLE IF NOT EXISTS peers (
1113            id          INTEGER PRIMARY KEY,
1114            access_hash INTEGER NOT NULL,
1115            is_channel  INTEGER NOT NULL DEFAULT 0
1116        );
1117    ";
1118
1119    fn block<F, T>(fut: F) -> io::Result<T>
1120    where
1121        F: std::future::Future<Output = Result<T, libsql::Error>>,
1122    {
1123        tokio::runtime::Handle::current()
1124            .block_on(fut)
1125            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
1126    }
1127
1128    async fn apply_schema(conn: &libsql::Connection) -> Result<(), libsql::Error> {
1129        conn.execute_batch(Self::SCHEMA).await
1130    }
1131
1132    /// Open a local file database.
1133    pub fn open_local(path: impl Into<PathBuf>) -> io::Result<Self> {
1134        let path = path.into();
1135        let label = path.display().to_string();
1136        let db = Self::block(async { libsql::Builder::new_local(path).build().await })?;
1137        let conn = Self::block(async { db.connect() })
1138            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1139        Self::block(Self::apply_schema(&conn))?;
1140        Ok(Self {
1141            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1142            label,
1143        })
1144    }
1145
1146    /// Open an in-process in-memory database (useful for tests).
1147    pub fn in_memory() -> io::Result<Self> {
1148        let db = Self::block(async { libsql::Builder::new_local(":memory:").build().await })?;
1149        let conn = Self::block(async { db.connect() })
1150            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1151        Self::block(Self::apply_schema(&conn))?;
1152        Ok(Self {
1153            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1154            label: ":memory:".into(),
1155        })
1156    }
1157
1158    /// Connect to a remote Turso database.
1159    pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> io::Result<Self> {
1160        let url = url.into();
1161        let label = url.clone();
1162        let db = Self::block(async {
1163            libsql::Builder::new_remote(url, auth_token.into())
1164                .build()
1165                .await
1166        })?;
1167        let conn = Self::block(async { db.connect() })
1168            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1169        Self::block(Self::apply_schema(&conn))?;
1170        Ok(Self {
1171            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1172            label,
1173        })
1174    }
1175
1176    /// Open an embedded replica (local file + Turso remote sync).
1177    pub fn open_replica(
1178        path: impl Into<PathBuf>,
1179        url: impl Into<String>,
1180        auth_token: impl Into<String>,
1181    ) -> io::Result<Self> {
1182        let path = path.into();
1183        let label = format!("{} (replica of {})", path.display(), url.into());
1184        let db = Self::block(async {
1185            libsql::Builder::new_remote_replica(path, url.into(), auth_token.into())
1186                .build()
1187                .await
1188        })?;
1189        let conn = Self::block(async { db.connect() })
1190            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1191        Self::block(Self::apply_schema(&conn))?;
1192        Ok(Self {
1193            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1194            label,
1195        })
1196    }
1197
1198    async fn read_session_async(
1199        conn: &libsql::Connection,
1200    ) -> Result<PersistedSession, libsql::Error> {
1201        use libsql::de;
1202
1203        // home_dc_id
1204        let home_dc_id: i32 = conn
1205            .query("SELECT value FROM meta WHERE key = 'home_dc_id'", ())
1206            .await?
1207            .next()
1208            .await?
1209            .map(|r| r.get::<i32>(0))
1210            .transpose()?
1211            .unwrap_or(0);
1212
1213        // dcs
1214        let mut rows = conn
1215            .query(
1216                "SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs",
1217                (),
1218            )
1219            .await?;
1220        let mut dcs = Vec::new();
1221        while let Some(row) = rows.next().await? {
1222            let dc_id: i32 = row.get(0)?;
1223            let flags_raw: u8 = row.get::<i64>(1)? as u8;
1224            let addr: String = row.get(2)?;
1225            let key_blob: Option<Vec<u8>> = row.get(3)?;
1226            let first_salt: i64 = row.get(4)?;
1227            let time_offset: i32 = row.get(5)?;
1228            let auth_key = match key_blob {
1229                Some(b) if b.len() == 256 => {
1230                    let mut k = [0u8; 256];
1231                    k.copy_from_slice(&b);
1232                    Some(k)
1233                }
1234                Some(b) => {
1235                    return Err(libsql::Error::Misuse(format!(
1236                        "auth_key blob must be 256 bytes, got {}",
1237                        b.len()
1238                    )));
1239                }
1240                None => None,
1241            };
1242            dcs.push(DcEntry {
1243                dc_id,
1244                addr,
1245                auth_key,
1246                first_salt,
1247                time_offset,
1248                flags: DcFlags(flags_raw),
1249            });
1250        }
1251
1252        // update_state
1253        let mut us_row = conn
1254            .query(
1255                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1256                (),
1257            )
1258            .await?;
1259        let updates_state = if let Some(r) = us_row.next().await? {
1260            UpdatesStateSnap {
1261                pts: r.get(0)?,
1262                qts: r.get(1)?,
1263                date: r.get(2)?,
1264                seq: r.get(3)?,
1265                channels: vec![],
1266            }
1267        } else {
1268            UpdatesStateSnap::default()
1269        };
1270
1271        // channel_pts
1272        let mut ch_rows = conn
1273            .query("SELECT channel_id, pts FROM channel_pts", ())
1274            .await?;
1275        let mut channels = Vec::new();
1276        while let Some(r) = ch_rows.next().await? {
1277            channels.push((r.get::<i64>(0)?, r.get::<i32>(1)?));
1278        }
1279
1280        // peers
1281        let mut peer_rows = conn
1282            .query("SELECT id, access_hash, is_channel FROM peers", ())
1283            .await?;
1284        let mut peers = Vec::new();
1285        while let Some(r) = peer_rows.next().await? {
1286            peers.push(CachedPeer {
1287                id: r.get(0)?,
1288                access_hash: r.get(1)?,
1289                is_channel: r.get::<i32>(2)? != 0,
1290            });
1291        }
1292
1293        Ok(PersistedSession {
1294            home_dc_id,
1295            dcs,
1296            updates_state: UpdatesStateSnap {
1297                channels,
1298                ..updates_state
1299            },
1300            peers,
1301        })
1302    }
1303
1304    async fn write_session_async(
1305        conn: &libsql::Connection,
1306        s: &PersistedSession,
1307    ) -> Result<(), libsql::Error> {
1308        conn.execute_batch("BEGIN IMMEDIATE").await?;
1309
1310        conn.execute(
1311            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1312             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1313            libsql::params![s.home_dc_id],
1314        )
1315        .await?;
1316
1317        conn.execute("DELETE FROM dcs", ()).await?;
1318        for d in &s.dcs {
1319            conn.execute(
1320                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1321                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1322                libsql::params![
1323                    d.dc_id,
1324                    d.flags.0 as i64,
1325                    d.addr.clone(),
1326                    d.auth_key.map(|k| k.to_vec()),
1327                    d.first_salt,
1328                    d.time_offset,
1329                ],
1330            )
1331            .await?;
1332        }
1333
1334        let us = &s.updates_state;
1335        conn.execute(
1336            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1337             ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1338             date=excluded.date,seq=excluded.seq",
1339            libsql::params![us.pts, us.qts, us.date, us.seq],
1340        )
1341        .await?;
1342
1343        conn.execute("DELETE FROM channel_pts", ()).await?;
1344        for &(cid, cpts) in &us.channels {
1345            conn.execute(
1346                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1,?2)",
1347                libsql::params![cid, cpts],
1348            )
1349            .await?;
1350        }
1351
1352        conn.execute("DELETE FROM peers", ()).await?;
1353        for p in &s.peers {
1354            conn.execute(
1355                "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1,?2,?3)",
1356                libsql::params![p.id, p.access_hash, p.is_channel as i32],
1357            )
1358            .await?;
1359        }
1360
1361        conn.execute_batch("COMMIT").await
1362    }
1363}
1364
1365#[cfg(feature = "libsql-session")]
1366impl SessionBackend for LibSqlBackend {
1367    fn save(&self, session: &PersistedSession) -> io::Result<()> {
1368        let conn = self.conn.clone();
1369        let session = session.clone();
1370        Self::block(async move {
1371            let conn = conn.lock().await;
1372            Self::write_session_async(&conn, session).await
1373        })
1374    }
1375
1376    fn load(&self) -> io::Result<Option<PersistedSession>> {
1377        let conn = self.conn.clone();
1378        let count: i64 = Self::block(async move {
1379            let conn = conn.lock().await;
1380            let mut rows = conn.query("SELECT COUNT(*) FROM meta", ()).await?;
1381            Ok::<i64, libsql::Error>(rows.next().await?.and_then(|r| r.get(0).ok()).unwrap_or(0))
1382        })?;
1383        if count == 0 {
1384            return Ok(None);
1385        }
1386        let conn = self.conn.clone();
1387        Self::block(async move {
1388            let conn = conn.lock().await;
1389            Self::read_session_async(&conn).await
1390        })
1391        .map(Some)
1392    }
1393
1394    fn delete(&self) -> io::Result<()> {
1395        let conn = self.conn.clone();
1396        Self::block(async move {
1397            let conn = conn.lock().await;
1398            conn.execute_batch(
1399                "BEGIN IMMEDIATE;
1400                 DELETE FROM meta;
1401                 DELETE FROM dcs;
1402                 DELETE FROM update_state;
1403                 DELETE FROM channel_pts;
1404                 DELETE FROM peers;
1405                 COMMIT;",
1406            )
1407            .await
1408        })
1409    }
1410
1411    fn name(&self) -> &str {
1412        &self.label
1413    }
1414
1415    // ── Granular overrides ───────────────────────────────────────────────────
1416
1417    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
1418        let conn = self.conn.clone();
1419        let (dc_id, addr, key, salt, off, flags) = (
1420            entry.dc_id,
1421            entry.addr.clone(),
1422            entry.auth_key.map(|k| k.to_vec()),
1423            entry.first_salt,
1424            entry.time_offset,
1425            entry.flags.0 as i64,
1426        );
1427        Self::block(async move {
1428            let conn = conn.lock().await;
1429            conn.execute(
1430                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1431                 VALUES (?1,?6,?2,?3,?4,?5)
1432                 ON CONFLICT(dc_id, flags) DO UPDATE SET
1433                   addr=excluded.addr, auth_key=excluded.auth_key,
1434                   first_salt=excluded.first_salt, time_offset=excluded.time_offset",
1435                libsql::params![dc_id, addr, key, salt, off, flags],
1436            )
1437            .await
1438            .map(|_| ())
1439        })
1440    }
1441
1442    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
1443        let conn = self.conn.clone();
1444        Self::block(async move {
1445            let conn = conn.lock().await;
1446            conn.execute(
1447                "INSERT INTO meta (key, value) VALUES ('home_dc_id',?1)
1448                 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
1449                libsql::params![dc_id],
1450            )
1451            .await
1452            .map(|_| ())
1453        })
1454    }
1455
1456    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
1457        let conn = self.conn.clone();
1458        Self::block(async move {
1459            let conn = conn.lock().await;
1460            match update {
1461                UpdateStateChange::All(snap) => {
1462                    conn.execute(
1463                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,?2,?3,?4)
1464                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1465                         date=excluded.date,seq=excluded.seq",
1466                        libsql::params![snap.pts, snap.qts, snap.date, snap.seq],
1467                    )
1468                    .await?;
1469                    conn.execute("DELETE FROM channel_pts", ()).await?;
1470                    for &(cid, cpts) in &snap.channels {
1471                        conn.execute(
1472                            "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)",
1473                            libsql::params![cid, cpts],
1474                        )
1475                        .await?;
1476                    }
1477                    Ok(())
1478                }
1479                UpdateStateChange::Primary { pts, date, seq } => conn
1480                    .execute(
1481                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,0,?2,?3)
1482                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,date=excluded.date,
1483                         seq=excluded.seq",
1484                        libsql::params![pts, date, seq],
1485                    )
1486                    .await
1487                    .map(|_| ()),
1488                UpdateStateChange::Secondary { qts } => conn
1489                    .execute(
1490                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,0,?1,0,0)
1491                         ON CONFLICT(id) DO UPDATE SET qts=excluded.qts",
1492                        libsql::params![qts],
1493                    )
1494                    .await
1495                    .map(|_| ()),
1496                UpdateStateChange::Channel { id, pts } => conn
1497                    .execute(
1498                        "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)
1499                         ON CONFLICT(channel_id) DO UPDATE SET pts=excluded.pts",
1500                        libsql::params![id, pts],
1501                    )
1502                    .await
1503                    .map(|_| ()),
1504            }
1505        })
1506    }
1507
1508    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1509        let conn = self.conn.clone();
1510        let (id, hash, is_ch) = (peer.id, peer.access_hash, peer.is_channel as i32);
1511        Self::block(async move {
1512            let conn = conn.lock().await;
1513            conn.execute(
1514                "INSERT INTO peers (id,access_hash,is_channel) VALUES (?1,?2,?3)
1515                 ON CONFLICT(id) DO UPDATE SET
1516                   access_hash=excluded.access_hash,
1517                   is_channel=excluded.is_channel",
1518                libsql::params![id, hash, is_ch],
1519            )
1520            .await
1521            .map(|_| ())
1522        })
1523    }
1524}