Skip to main content

ferogram_session/
lib.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13#![deny(unsafe_code)]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15//! Session persistence types and storage backends for ferogram.
16//!
17//! This crate is part of [ferogram](https://crates.io/crates/ferogram), an async Rust
18//! MTProto client built by [Ankit Chaubey](https://github.com/ankit-chaubey).
19//!
20//! - Channel: [t.me/Ferogram](https://t.me/Ferogram)
21//! - Chat: [t.me/FerogramChat](https://t.me/FerogramChat)
22//!
23//! Most users do not use this crate directly. The `ferogram` crate wires it up
24//! automatically when you call `Client::builder().session(...)` or
25//! `.session_string(...)`.
26//!
27//! # What's in here
28//!
29//! - [`PersistedSession`]: the serializable session struct. Holds the DC table
30//!   (one `AuthKey` + salt + time offset per DC), update sequence counters
31//!   (PTS, QTS, date, seq), and the peer access-hash cache.
32//! - [`SessionBackend`]: the trait all backends implement. A single method:
33//!   `save(&PersistedSession)` and `load() -> Option<PersistedSession>`.
34//! - [`BinaryFileBackend`]: stores the session as a binary file on disk.
35//!   Good for bots and scripts. No extra dependencies.
36//! - [`InMemoryBackend`]: keeps everything in memory. Nothing survives process
37//!   exit. Used for tests and ephemeral tasks.
38//! - [`StringSessionBackend`]: serializes the session to a base64 string.
39//!   Useful for serverless environments where you store state in an env var or
40//!   database column. Load it with `Client::builder().session_string(s)`.
41//! - [`SqliteBackend`]: SQLite-backed storage via rusqlite. Behind the
42//!   `sqlite-session` feature flag. Good for local multi-account tooling.
43//! - [`LibSqlBackend`]: libSQL / Turso backend. Behind `libsql-session`.
44//!   For distributed or edge-hosted session storage.
45//!
46//! You can also implement `SessionBackend` yourself for Redis, PostgreSQL, or
47//! anything else.
48//!
49//! # Binary format
50//!
51//! The file backends start with a version byte:
52//! - `0x01`: legacy (DC table only, no update state or peer cache).
53//! - `0x02`: current (DC table + update state + peer cache).
54//!
55//! `load()` handles both. `save()` always writes v2.
56//!
57//! # Example: export and re-import a session
58//!
59//! ```rust,ignore
60//! # async fn example(client: ferogram::Client) -> anyhow::Result<()> {
61//! // Export
62//! let s = client.export_session_string().await?;
63//!
64//! // Later, in another process or after a restart:
65//! let (client, _) = ferogram::Client::builder()
66//!     .api_id(12345)
67//!     .api_hash("api_hash")
68//!     .session_string(s)
69//!     .connect()
70//!     .await?;
71//! # Ok(())
72//! # }
73//! ```
74
75use std::collections::HashMap;
76use std::io::{self, ErrorKind};
77use std::path::Path;
78
79#[cfg(feature = "serde")]
80mod auth_key_serde {
81    use serde::{Deserialize, Deserializer, Serializer};
82
83    pub fn serialize<S>(value: &Option<[u8; 256]>, s: S) -> Result<S::Ok, S::Error>
84    where
85        S: Serializer,
86    {
87        match value {
88            Some(k) => s.serialize_some(k.as_slice()),
89            None => s.serialize_none(),
90        }
91    }
92
93    pub fn deserialize<'de, D>(d: D) -> Result<Option<[u8; 256]>, D::Error>
94    where
95        D: Deserializer<'de>,
96    {
97        let opt: Option<Vec<u8>> = Option::deserialize(d)?;
98        match opt {
99            None => Ok(None),
100            Some(v) => {
101                let arr: [u8; 256] = v
102                    .try_into()
103                    .map_err(|_| serde::de::Error::custom("auth_key must be exactly 256 bytes"))?;
104                Ok(Some(arr))
105            }
106        }
107    }
108}
109
110/// Per-DC option flags.
111///
112/// Stored in the session (v3+) so media DCs survive restarts.
113#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct DcFlags(pub u8);
116
117impl DcFlags {
118    pub const NONE: DcFlags = DcFlags(0);
119    pub const IPV6: DcFlags = DcFlags(1 << 0);
120    pub const MEDIA_ONLY: DcFlags = DcFlags(1 << 1);
121    pub const TCPO_ONLY: DcFlags = DcFlags(1 << 2);
122    pub const CDN: DcFlags = DcFlags(1 << 3);
123    pub const STATIC: DcFlags = DcFlags(1 << 4);
124
125    pub fn contains(self, other: DcFlags) -> bool {
126        self.0 & other.0 == other.0
127    }
128
129    pub fn set(&mut self, flag: DcFlags) {
130        self.0 |= flag.0;
131    }
132}
133
134impl std::ops::BitOr for DcFlags {
135    type Output = DcFlags;
136    fn bitor(self, rhs: DcFlags) -> DcFlags {
137        DcFlags(self.0 | rhs.0)
138    }
139}
140
141/// One entry in the DC address table.
142#[derive(Clone, Debug)]
143#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
144pub struct DcEntry {
145    pub dc_id: i32,
146    pub addr: String,
147    #[cfg_attr(feature = "serde", serde(with = "auth_key_serde"))]
148    pub auth_key: Option<[u8; 256]>,
149    pub first_salt: i64,
150    pub time_offset: i32,
151    /// DC capability flags (IPv6, media-only, CDN, ...).
152    pub flags: DcFlags,
153}
154
155impl DcEntry {
156    /// Returns `true` when this entry represents an IPv6 address.
157    #[inline]
158    pub fn is_ipv6(&self) -> bool {
159        self.flags.contains(DcFlags::IPV6)
160    }
161
162    /// Parse the stored `"ip:port"` / `"[ipv6]:port"` address into a
163    /// [`std::net::SocketAddr`].
164    ///
165    /// Both formats are valid:
166    /// - IPv4: `"149.154.175.53:443"`
167    /// - IPv6: `"[2001:b28:f23d:f001::a]:443"`
168    pub fn socket_addr(&self) -> io::Result<std::net::SocketAddr> {
169        self.addr.parse::<std::net::SocketAddr>().map_err(|_| {
170            io::Error::new(
171                io::ErrorKind::InvalidData,
172                format!("invalid DC address: {:?}", self.addr),
173            )
174        })
175    }
176
177    /// Construct a `DcEntry` from separate IP string, port, and flags.
178    ///
179    /// IPv6 addresses are automatically wrapped in brackets so that
180    /// `socket_addr()` can round-trip them correctly:
181    ///
182    /// ```text
183    /// DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6)
184    /// // addr = "[2001:b28:f23d:f001::a]:443"
185    /// ```
186    ///
187    /// This is the preferred constructor when processing `help.getConfig`
188    /// `DcOption` objects from the Telegram API.
189    pub fn from_parts(dc_id: i32, ip: &str, port: u16, flags: DcFlags) -> Self {
190        // IPv6 addresses contain colons; wrap in brackets for SocketAddr compat.
191        let addr = if ip.contains(':') {
192            format!("[{ip}]:{port}")
193        } else {
194            format!("{ip}:{port}")
195        };
196        Self {
197            dc_id,
198            addr,
199            auth_key: None,
200            first_salt: 0,
201            time_offset: 0,
202            flags,
203        }
204    }
205}
206
207/// Snapshot of the MTProto update-sequence state that we persist so that
208/// `catch_up: true` can call `updates.getDifference` with the *pre-shutdown* pts.
209#[derive(Clone, Debug, Default)]
210#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
211pub struct UpdatesStateSnap {
212    /// Main persistence counter (messages, non-channel updates).
213    pub pts: i32,
214    /// Secondary counter for secret chats.
215    pub qts: i32,
216    /// Date of the last received update (Unix timestamp).
217    pub date: i32,
218    /// Combined-container sequence number.
219    pub seq: i32,
220    /// Per-channel persistence counters.  `(channel_id, pts)`.
221    pub channels: Vec<(i64, i32)>,
222}
223
224impl UpdatesStateSnap {
225    /// Returns `true` when we have a real state from the server (pts > 0).
226    #[inline]
227    pub fn is_initialised(&self) -> bool {
228        self.pts > 0
229    }
230
231    /// Advance (or insert) a per-channel pts value.
232    pub fn set_channel_pts(&mut self, channel_id: i64, pts: i32) {
233        if let Some(entry) = self.channels.iter_mut().find(|c| c.0 == channel_id) {
234            entry.1 = pts;
235        } else {
236            self.channels.push((channel_id, pts));
237        }
238    }
239
240    /// Look up the stored pts for a channel, returns 0 if unknown.
241    pub fn channel_pts(&self, channel_id: i64) -> i32 {
242        self.channels
243            .iter()
244            .find(|c| c.0 == channel_id)
245            .map(|c| c.1)
246            .unwrap_or(0)
247    }
248}
249
250/// A cached access-hash entry so that the peer can be addressed across restarts
251/// without re-resolving it from Telegram.
252#[derive(Clone, Debug)]
253#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
254pub struct CachedPeer {
255    /// Bare Telegram peer ID (always positive).
256    pub id: i64,
257    /// Access hash bound to the current session.
258    /// Always 0 for regular group chats (they need no access_hash).
259    pub access_hash: i64,
260    /// `true` → channel / supergroup.  `false` → user or regular group.
261    pub is_channel: bool,
262    /// `true` → regular group chat (Chat::Chat / ChatForbidden).
263    /// When true, access_hash is meaningless (groups need no hash).
264    pub is_chat: bool,
265}
266
267/// A min-user context entry: the user was seen with `min=true` (access_hash
268/// not usable directly) so we store the peer+message where they appeared so
269/// that `InputPeerUserFromMessage` can be constructed on restart.
270#[derive(Clone, Debug)]
271#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
272pub struct CachedMinPeer {
273    /// The min user's ID.
274    pub user_id: i64,
275    /// The channel/chat/user ID of the peer that contained the message.
276    pub peer_id: i64,
277    /// The message ID within that peer.
278    pub msg_id: i32,
279}
280
281/// Everything that needs to survive a process restart.
282#[derive(Clone, Debug, Default)]
283#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
284pub struct PersistedSession {
285    pub home_dc_id: i32,
286    pub dcs: Vec<DcEntry>,
287    /// Update counters to enable reliable catch-up after a disconnect.
288    pub updates_state: UpdatesStateSnap,
289    /// Peer access-hash cache so that the client can reach out to any previously
290    /// seen user or channel without re-resolving them.
291    pub peers: Vec<CachedPeer>,
292    /// Min-user message contexts: users seen with `min=true` that can only be
293    /// addressed via `InputPeerUserFromMessage`.
294    pub min_peers: Vec<CachedMinPeer>,
295}
296
297impl PersistedSession {
298    /// Encode the session to raw bytes (v2 binary format).
299    pub fn to_bytes(&self) -> Vec<u8> {
300        let mut b = Vec::with_capacity(512);
301
302        b.push(0x05u8); // version
303
304        b.extend_from_slice(&self.home_dc_id.to_le_bytes());
305
306        b.push(self.dcs.len() as u8);
307        for d in &self.dcs {
308            b.extend_from_slice(&d.dc_id.to_le_bytes());
309            match &d.auth_key {
310                Some(k) => {
311                    b.push(1);
312                    b.extend_from_slice(k);
313                }
314                None => {
315                    b.push(0);
316                }
317            }
318            b.extend_from_slice(&d.first_salt.to_le_bytes());
319            b.extend_from_slice(&d.time_offset.to_le_bytes());
320            let ab = d.addr.as_bytes();
321            b.push(ab.len() as u8);
322            b.extend_from_slice(ab);
323            b.push(d.flags.0);
324        }
325
326        b.extend_from_slice(&self.updates_state.pts.to_le_bytes());
327        b.extend_from_slice(&self.updates_state.qts.to_le_bytes());
328        b.extend_from_slice(&self.updates_state.date.to_le_bytes());
329        b.extend_from_slice(&self.updates_state.seq.to_le_bytes());
330        let ch = &self.updates_state.channels;
331        b.extend_from_slice(&(ch.len() as u16).to_le_bytes());
332        for &(cid, cpts) in ch {
333            b.extend_from_slice(&cid.to_le_bytes());
334            b.extend_from_slice(&cpts.to_le_bytes());
335        }
336
337        // v5 peer type: 0=user, 1=channel, 2=regular-group-chat
338        b.extend_from_slice(&(self.peers.len() as u16).to_le_bytes());
339        for p in &self.peers {
340            b.extend_from_slice(&p.id.to_le_bytes());
341            b.extend_from_slice(&p.access_hash.to_le_bytes());
342            let peer_type: u8 = if p.is_chat {
343                2
344            } else if p.is_channel {
345                1
346            } else {
347                0
348            };
349            b.push(peer_type);
350        }
351
352        b.extend_from_slice(&(self.min_peers.len() as u16).to_le_bytes());
353        for m in &self.min_peers {
354            b.extend_from_slice(&m.user_id.to_le_bytes());
355            b.extend_from_slice(&m.peer_id.to_le_bytes());
356            b.extend_from_slice(&m.msg_id.to_le_bytes());
357        }
358
359        b
360    }
361
362    /// Atomically save the session to `path`.
363    ///
364    /// Writes to `<path>.<seq>.tmp` (unique per call) then renames into place.
365    /// A fixed `.tmp` extension causes OS error 2 (ERROR_FILE_NOT_FOUND) on
366    /// Windows when two concurrent persist_state calls race: thread A renames
367    /// `.tmp` away while thread B's rename finds the source gone.
368    pub fn save(&self, path: &Path) -> io::Result<()> {
369        use std::sync::atomic::{AtomicU64, Ordering};
370        static SEQ: AtomicU64 = AtomicU64::new(0);
371        let n = SEQ.fetch_add(1, Ordering::Relaxed);
372        let tmp = path.with_extension(format!("{n}.tmp"));
373        std::fs::write(&tmp, self.to_bytes())?;
374        std::fs::rename(&tmp, path).inspect_err(|_e| {
375            let _ = std::fs::remove_file(&tmp);
376        })
377    }
378
379    /// Decode a session from raw bytes (v1 or v2 binary format).
380    pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
381        if buf.is_empty() {
382            return Err(io::Error::new(ErrorKind::InvalidData, "empty session data"));
383        }
384
385        let mut p = 0usize;
386
387        macro_rules! r {
388            ($n:expr) => {{
389                if p + $n > buf.len() {
390                    return Err(io::Error::new(ErrorKind::InvalidData, "truncated session"));
391                }
392                let s = &buf[p..p + $n];
393                p += $n;
394                s
395            }};
396        }
397        macro_rules! r_i32 {
398            () => {
399                i32::from_le_bytes(r!(4).try_into().unwrap())
400            };
401        }
402        macro_rules! r_i64 {
403            () => {
404                i64::from_le_bytes(r!(8).try_into().unwrap())
405            };
406        }
407        macro_rules! r_u8 {
408            () => {
409                r!(1)[0]
410            };
411        }
412        macro_rules! r_u16 {
413            () => {
414                u16::from_le_bytes(r!(2).try_into().unwrap())
415            };
416        }
417
418        let first_byte = r_u8!();
419
420        let (home_dc_id, version) = if first_byte == 0x05 {
421            (r_i32!(), 5u8)
422        } else if first_byte == 0x04 {
423            (r_i32!(), 4u8)
424        } else if first_byte == 0x03 {
425            (r_i32!(), 3u8)
426        } else if first_byte == 0x02 {
427            (r_i32!(), 2u8)
428        } else {
429            let rest = r!(3);
430            let mut bytes = [0u8; 4];
431            bytes[0] = first_byte;
432            bytes[1..4].copy_from_slice(rest);
433            (i32::from_le_bytes(bytes), 1u8)
434        };
435
436        let dc_count = r_u8!() as usize;
437        let mut dcs = Vec::with_capacity(dc_count);
438        for _ in 0..dc_count {
439            let dc_id = r_i32!();
440            let has_key = r_u8!();
441            let auth_key = if has_key == 1 {
442                let mut k = [0u8; 256];
443                k.copy_from_slice(r!(256));
444                Some(k)
445            } else {
446                None
447            };
448            let first_salt = r_i64!();
449            let time_offset = r_i32!();
450            let al = r_u8!() as usize;
451            let addr = String::from_utf8_lossy(r!(al)).into_owned();
452            let flags = if version >= 3 {
453                DcFlags(r_u8!())
454            } else {
455                DcFlags::NONE
456            };
457            dcs.push(DcEntry {
458                dc_id,
459                addr,
460                auth_key,
461                first_salt,
462                time_offset,
463                flags,
464            });
465        }
466
467        if version < 2 {
468            return Ok(Self {
469                home_dc_id,
470                dcs,
471                updates_state: UpdatesStateSnap::default(),
472                peers: Vec::new(),
473                min_peers: Vec::new(),
474            });
475        }
476
477        let pts = r_i32!();
478        let qts = r_i32!();
479        let date = r_i32!();
480        let seq = r_i32!();
481        let ch_count = r_u16!() as usize;
482        let mut channels = Vec::with_capacity(ch_count);
483        for _ in 0..ch_count {
484            let cid = r_i64!();
485            let cpts = r_i32!();
486            channels.push((cid, cpts));
487        }
488
489        let peer_count = r_u16!() as usize;
490        let mut peers = Vec::with_capacity(peer_count);
491        for _ in 0..peer_count {
492            let id = r_i64!();
493            let access_hash = r_i64!();
494            // v5: type byte 0=user, 1=channel, 2=chat; v2-v4: 0=user, 1=channel
495            let peer_type = r_u8!();
496            let is_channel = peer_type == 1;
497            let is_chat = peer_type == 2;
498            peers.push(CachedPeer {
499                id,
500                access_hash,
501                is_channel,
502                is_chat,
503            });
504        }
505
506        // v4+: min-user contexts
507        let min_peers = if version >= 4 {
508            let count = r_u16!() as usize;
509            let mut v = Vec::with_capacity(count);
510            for _ in 0..count {
511                let user_id = r_i64!();
512                let peer_id = r_i64!();
513                let msg_id = r_i32!();
514                v.push(CachedMinPeer {
515                    user_id,
516                    peer_id,
517                    msg_id,
518                });
519            }
520            v
521        } else {
522            Vec::new()
523        };
524
525        Ok(Self {
526            home_dc_id,
527            dcs,
528            updates_state: UpdatesStateSnap {
529                pts,
530                qts,
531                date,
532                seq,
533                channels,
534            },
535            peers,
536            min_peers,
537        })
538    }
539
540    /// Decode a session from a URL-safe base64 string produced by [`to_string`].
541    pub fn from_string(s: &str) -> io::Result<Self> {
542        use base64::Engine as _;
543        let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
544            .decode(s.trim())
545            .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
546        Self::from_bytes(&bytes)
547    }
548
549    pub fn load(path: &Path) -> io::Result<Self> {
550        let buf = std::fs::read(path)?;
551        Self::from_bytes(&buf)
552    }
553
554    // DC address helpers
555
556    /// Find the best DC entry for a given DC ID.
557    ///
558    /// When `prefer_ipv6` is `true`, returns the IPv6 entry if one is
559    /// stored, falling back to IPv4.  When `false`, returns IPv4,
560    /// falling back to IPv6.  Returns `None` only when the DC ID is
561    /// completely unknown.
562    ///
563    /// This correctly handles the case where both an IPv4 and an IPv6
564    /// `DcEntry` exist for the same `dc_id` (different `flags` bitmask).
565    pub fn dc_for(&self, dc_id: i32, prefer_ipv6: bool) -> Option<&DcEntry> {
566        let mut candidates = self.dcs.iter().filter(|d| d.dc_id == dc_id).peekable();
567        candidates.peek()?;
568        // Collect so we can search twice
569        let cands: Vec<&DcEntry> = self.dcs.iter().filter(|d| d.dc_id == dc_id).collect();
570        // Preferred family first, fall back to whatever is available
571        cands
572            .iter()
573            .copied()
574            .find(|d| d.is_ipv6() == prefer_ipv6)
575            .or_else(|| cands.first().copied())
576    }
577
578    /// Iterate over every stored DC entry for a given DC ID.
579    ///
580    /// Typically yields one IPv4 and one IPv6 entry per DC ID once
581    /// `help.getConfig` has been applied.
582    pub fn all_dcs_for(&self, dc_id: i32) -> impl Iterator<Item = &DcEntry> {
583        self.dcs.iter().filter(move |d| d.dc_id == dc_id)
584    }
585}
586
587impl std::fmt::Display for PersistedSession {
588    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589        use base64::Engine as _;
590        f.write_str(&base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(self.to_bytes()))
591    }
592}
593
594/// Bootstrap DC address table (fallback if GetConfig fails).
595pub fn default_dc_addresses() -> HashMap<i32, String> {
596    [
597        (1, "149.154.175.53:443"),
598        (2, "149.154.167.51:443"),
599        (3, "149.154.175.100:443"),
600        (4, "149.154.167.91:443"),
601        (5, "91.108.56.130:443"),
602    ]
603    .into_iter()
604    .map(|(id, addr)| (id, addr.to_string()))
605    .collect()
606}
607
608// session_backend
609//
610// Pluggable session storage backend.
611
612use std::path::PathBuf;
613
614// Core trait (unchanged)
615
616/// Synchronous snapshot backend: saves and loads the full session at once.
617///
618/// All built-in backends implement this. Higher-level code should prefer the
619/// extension methods below (`update_dc`, `set_home_dc`, `update_state`) which
620/// avoid unnecessary full-snapshot writes.
621pub trait SessionBackend: Send + Sync {
622    fn save(&self, session: &PersistedSession) -> io::Result<()>;
623    fn load(&self) -> io::Result<Option<PersistedSession>>;
624    fn delete(&self) -> io::Result<()>;
625
626    /// Human-readable name for logging/debug output.
627    fn name(&self) -> &str;
628
629    // Granular helpers (default: load → mutate → save)
630    //
631    // These default implementations are correct but not optimal.
632    // Backends that store data in a database (SQLite, libsql, Redis) should
633    // override them to issue single-row UPDATE statements instead.
634
635    /// Update a single DC entry without rewriting the entire session.
636    ///
637    /// Typically called after:
638    /// - completing a DH handshake on a new DC (to persist its auth key)
639    /// - receiving updated DC addresses from `help.getConfig`
640    ///
641    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
642        let mut s = self.load()?.unwrap_or_default();
643        // Replace existing entry or append
644        if let Some(existing) = s
645            .dcs
646            .iter_mut()
647            .find(|d| d.dc_id == entry.dc_id && d.is_ipv6() == entry.is_ipv6())
648        {
649            *existing = entry.clone();
650        } else {
651            s.dcs.push(entry.clone());
652        }
653        self.save(&s)
654    }
655
656    /// Change the home DC without touching any other session data.
657    ///
658    /// Called after a successful `*_MIGRATE` redirect: the user's account
659    /// now lives on a different DC.
660    ///
661    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
662        let mut s = self.load()?.unwrap_or_default();
663        s.home_dc_id = dc_id;
664        self.save(&s)
665    }
666
667    /// Apply a single update-sequence change without a full save/load.
668    ///
669    ///
670    /// `update` is the new partial or full state to merge in.
671    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
672        let mut s = self.load()?.unwrap_or_default();
673        update.apply_to(&mut s.updates_state);
674        self.save(&s)
675    }
676
677    /// Cache a peer access hash without a full session save.
678    ///
679    /// This is lossy-on-default (full round-trip) but correct.
680    /// Override in SQL backends to issue a single `INSERT OR REPLACE`.
681    ///
682    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
683        let mut s = self.load()?.unwrap_or_default();
684        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
685            *existing = peer.clone();
686        } else {
687            s.peers.push(peer.clone());
688        }
689        self.save(&s)
690    }
691}
692
693// UpdateStateChange (mirrors  UpdateState enum)
694
695/// A single update-sequence change, applied via [`SessionBackend::apply_update_state`].
696///
697///uses:
698/// ```text
699/// UpdateState::All(updates_state)
700/// UpdateState::Primary { pts, date, seq }
701/// UpdateState::Secondary { qts }
702/// UpdateState::Channel { id, pts }
703/// ```
704///
705/// We map this 1-to-1 to layer's `UpdatesStateSnap`.
706#[derive(Debug, Clone)]
707pub enum UpdateStateChange {
708    /// Replace the entire state snapshot.
709    All(UpdatesStateSnap),
710    /// Update main sequence counters only (non-channel).
711    Primary { pts: i32, date: i32, seq: i32 },
712    /// Update the QTS counter (secret chats).
713    Secondary { qts: i32 },
714    /// Update the PTS for a specific channel.
715    Channel { id: i64, pts: i32 },
716}
717
718impl UpdateStateChange {
719    /// Apply `self` to `snap` in-place.
720    pub fn apply_to(&self, snap: &mut UpdatesStateSnap) {
721        match self {
722            Self::All(new_snap) => *snap = new_snap.clone(),
723            Self::Primary { pts, date, seq } => {
724                snap.pts = *pts;
725                snap.date = *date;
726                snap.seq = *seq;
727            }
728            Self::Secondary { qts } => {
729                snap.qts = *qts;
730            }
731            Self::Channel { id, pts } => {
732                // Replace or insert per-channel pts
733                if let Some(existing) = snap.channels.iter_mut().find(|c| c.0 == *id) {
734                    existing.1 = *pts;
735                } else {
736                    snap.channels.push((*id, *pts));
737                }
738            }
739        }
740    }
741}
742
743// BinaryFileBackend
744
745/// Stores the session in a compact binary file (v2 format).
746pub struct BinaryFileBackend {
747    path: PathBuf,
748    /// Serialises concurrent save() calls within the same process so they
749    /// don't interleave on the tmp file even if PersistedSession::save uses
750    /// unique names (belt-and-suspenders; also prevents torn reads of the
751    /// session file from a concurrent load + save).
752    write_lock: std::sync::Mutex<()>,
753}
754
755impl BinaryFileBackend {
756    pub fn new(path: impl Into<PathBuf>) -> Self {
757        Self {
758            path: path.into(),
759            write_lock: std::sync::Mutex::new(()),
760        }
761    }
762
763    pub fn path(&self) -> &std::path::Path {
764        &self.path
765    }
766}
767
768impl SessionBackend for BinaryFileBackend {
769    fn save(&self, session: &PersistedSession) -> io::Result<()> {
770        let _guard = self.write_lock.lock().unwrap();
771        session.save(&self.path)
772    }
773
774    fn load(&self) -> io::Result<Option<PersistedSession>> {
775        if !self.path.exists() {
776            return Ok(None);
777        }
778        match PersistedSession::load(&self.path) {
779            Ok(s) => Ok(Some(s)),
780            Err(e) => {
781                let bak = self.path.with_extension("bak");
782                tracing::warn!(
783                    "[ferogram] Session file {:?} is corrupt ({e}); \
784                     renaming to {:?} and starting fresh",
785                    self.path,
786                    bak
787                );
788                let _ = std::fs::rename(&self.path, &bak);
789                Ok(None)
790            }
791        }
792    }
793
794    fn delete(&self) -> io::Result<()> {
795        if self.path.exists() {
796            std::fs::remove_file(&self.path)?;
797        }
798        Ok(())
799    }
800
801    fn name(&self) -> &str {
802        "binary-file"
803    }
804
805    // BinaryFileBackend: the default granular impls (load→mutate→save) are
806    // fine since the format is a single compact binary blob. No override needed.
807}
808
809// InMemoryBackend
810
811/// Ephemeral in-process session: nothing persisted to disk.
812///
813/// Override the granular methods to skip the clone overhead of the full
814/// snapshot path (we're already in memory, so direct field mutations are
815/// cheaper than clone→mutate→replace).
816#[derive(Default)]
817pub struct InMemoryBackend {
818    data: std::sync::Mutex<Option<PersistedSession>>,
819}
820
821impl InMemoryBackend {
822    pub fn new() -> Self {
823        Self::default()
824    }
825
826    /// Test helper: get a snapshot of the current in-memory state.
827    pub fn snapshot(&self) -> Option<PersistedSession> {
828        self.data.lock().unwrap().clone()
829    }
830}
831
832impl SessionBackend for InMemoryBackend {
833    fn save(&self, s: &PersistedSession) -> io::Result<()> {
834        *self.data.lock().unwrap() = Some(s.clone());
835        Ok(())
836    }
837
838    fn load(&self) -> io::Result<Option<PersistedSession>> {
839        Ok(self.data.lock().unwrap().clone())
840    }
841
842    fn delete(&self) -> io::Result<()> {
843        *self.data.lock().unwrap() = None;
844        Ok(())
845    }
846
847    fn name(&self) -> &str {
848        "in-memory"
849    }
850
851    // Granular overrides: cheaper than load→clone→save
852
853    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
854        let mut guard = self.data.lock().unwrap();
855        let s = guard.get_or_insert_with(PersistedSession::default);
856        if let Some(existing) = s
857            .dcs
858            .iter_mut()
859            .find(|d| d.dc_id == entry.dc_id && d.is_ipv6() == entry.is_ipv6())
860        {
861            *existing = entry.clone();
862        } else {
863            s.dcs.push(entry.clone());
864        }
865        Ok(())
866    }
867
868    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
869        let mut guard = self.data.lock().unwrap();
870        let s = guard.get_or_insert_with(PersistedSession::default);
871        s.home_dc_id = dc_id;
872        Ok(())
873    }
874
875    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
876        let mut guard = self.data.lock().unwrap();
877        let s = guard.get_or_insert_with(PersistedSession::default);
878        update.apply_to(&mut s.updates_state);
879        Ok(())
880    }
881
882    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
883        let mut guard = self.data.lock().unwrap();
884        let s = guard.get_or_insert_with(PersistedSession::default);
885        if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
886            *existing = peer.clone();
887        } else {
888            s.peers.push(peer.clone());
889        }
890        Ok(())
891    }
892}
893
894// StringSessionBackend
895
896/// Portable base64 string session backend.
897pub struct StringSessionBackend {
898    data: std::sync::Mutex<String>,
899}
900
901impl StringSessionBackend {
902    pub fn new(s: impl Into<String>) -> Self {
903        Self {
904            data: std::sync::Mutex::new(s.into()),
905        }
906    }
907
908    pub fn current(&self) -> String {
909        self.data.lock().unwrap().clone()
910    }
911}
912
913impl SessionBackend for StringSessionBackend {
914    fn save(&self, session: &PersistedSession) -> io::Result<()> {
915        *self.data.lock().unwrap() = session.to_string();
916        Ok(())
917    }
918
919    fn load(&self) -> io::Result<Option<PersistedSession>> {
920        let s = self.data.lock().unwrap().clone();
921        if s.trim().is_empty() {
922            return Ok(None);
923        }
924        PersistedSession::from_string(&s).map(Some)
925    }
926
927    fn delete(&self) -> io::Result<()> {
928        *self.data.lock().unwrap() = String::new();
929        Ok(())
930    }
931
932    fn name(&self) -> &str {
933        "string-session"
934    }
935}
936
937// Tests
938
939#[cfg(test)]
940mod tests {
941    use super::*;
942
943    fn make_dc(id: i32) -> DcEntry {
944        DcEntry {
945            dc_id: id,
946            addr: format!("1.2.3.{id}:443"),
947            auth_key: None,
948            first_salt: 0,
949            time_offset: 0,
950            flags: DcFlags::NONE,
951        }
952    }
953
954    fn make_peer(id: i64, hash: i64) -> CachedPeer {
955        CachedPeer {
956            id,
957            access_hash: hash,
958            is_channel: false,
959            is_chat: false,
960        }
961    }
962
963    // InMemoryBackend: basic save/load
964
965    #[test]
966    fn inmemory_load_returns_none_when_empty() {
967        let b = InMemoryBackend::new();
968        assert!(b.load().unwrap().is_none());
969    }
970
971    #[test]
972    fn inmemory_save_then_load_round_trips() {
973        let b = InMemoryBackend::new();
974        let mut s = PersistedSession::default();
975        s.home_dc_id = 3;
976        s.dcs.push(make_dc(3));
977        b.save(&s).unwrap();
978
979        let loaded = b.load().unwrap().unwrap();
980        assert_eq!(loaded.home_dc_id, 3);
981        assert_eq!(loaded.dcs.len(), 1);
982    }
983
984    #[test]
985    fn inmemory_delete_clears_state() {
986        let b = InMemoryBackend::new();
987        let mut s = PersistedSession::default();
988        s.home_dc_id = 2;
989        b.save(&s).unwrap();
990        b.delete().unwrap();
991        assert!(b.load().unwrap().is_none());
992    }
993
994    // InMemoryBackend: granular methods
995
996    #[test]
997    fn inmemory_update_dc_inserts_new() {
998        let b = InMemoryBackend::new();
999        b.update_dc(&make_dc(4)).unwrap();
1000        let s = b.snapshot().unwrap();
1001        assert_eq!(s.dcs.len(), 1);
1002        assert_eq!(s.dcs[0].dc_id, 4);
1003    }
1004
1005    #[test]
1006    fn inmemory_update_dc_replaces_existing() {
1007        let b = InMemoryBackend::new();
1008        b.update_dc(&make_dc(2)).unwrap();
1009        let mut updated = make_dc(2);
1010        updated.addr = "9.9.9.9:443".to_string();
1011        b.update_dc(&updated).unwrap();
1012
1013        let s = b.snapshot().unwrap();
1014        assert_eq!(s.dcs.len(), 1);
1015        assert_eq!(s.dcs[0].addr, "9.9.9.9:443");
1016    }
1017
1018    #[test]
1019    fn inmemory_set_home_dc() {
1020        let b = InMemoryBackend::new();
1021        b.set_home_dc(5).unwrap();
1022        assert_eq!(b.snapshot().unwrap().home_dc_id, 5);
1023    }
1024
1025    #[test]
1026    fn inmemory_cache_peer_inserts() {
1027        let b = InMemoryBackend::new();
1028        b.cache_peer(&make_peer(100, 0xdeadbeef)).unwrap();
1029        let s = b.snapshot().unwrap();
1030        assert_eq!(s.peers.len(), 1);
1031        assert_eq!(s.peers[0].id, 100);
1032    }
1033
1034    #[test]
1035    fn inmemory_cache_peer_updates_existing() {
1036        let b = InMemoryBackend::new();
1037        b.cache_peer(&make_peer(100, 111)).unwrap();
1038        b.cache_peer(&make_peer(100, 222)).unwrap();
1039        let s = b.snapshot().unwrap();
1040        assert_eq!(s.peers.len(), 1);
1041        assert_eq!(s.peers[0].access_hash, 222);
1042    }
1043
1044    // UpdateStateChange
1045
1046    #[test]
1047    fn update_state_primary() {
1048        let mut snap = UpdatesStateSnap {
1049            pts: 0,
1050            qts: 0,
1051            date: 0,
1052            seq: 0,
1053            channels: vec![],
1054        };
1055        UpdateStateChange::Primary {
1056            pts: 10,
1057            date: 20,
1058            seq: 30,
1059        }
1060        .apply_to(&mut snap);
1061        assert_eq!(snap.pts, 10);
1062        assert_eq!(snap.date, 20);
1063        assert_eq!(snap.seq, 30);
1064        assert_eq!(snap.qts, 0); // untouched
1065    }
1066
1067    #[test]
1068    fn update_state_secondary() {
1069        let mut snap = UpdatesStateSnap {
1070            pts: 5,
1071            qts: 0,
1072            date: 0,
1073            seq: 0,
1074            channels: vec![],
1075        };
1076        UpdateStateChange::Secondary { qts: 99 }.apply_to(&mut snap);
1077        assert_eq!(snap.qts, 99);
1078        assert_eq!(snap.pts, 5); // untouched
1079    }
1080
1081    #[test]
1082    fn update_state_channel_inserts() {
1083        let mut snap = UpdatesStateSnap {
1084            pts: 0,
1085            qts: 0,
1086            date: 0,
1087            seq: 0,
1088            channels: vec![],
1089        };
1090        UpdateStateChange::Channel { id: 12345, pts: 42 }.apply_to(&mut snap);
1091        assert_eq!(snap.channels, vec![(12345, 42)]);
1092    }
1093
1094    #[test]
1095    fn update_state_channel_updates_existing() {
1096        let mut snap = UpdatesStateSnap {
1097            pts: 0,
1098            qts: 0,
1099            date: 0,
1100            seq: 0,
1101            channels: vec![(12345, 10), (67890, 5)],
1102        };
1103        UpdateStateChange::Channel { id: 12345, pts: 99 }.apply_to(&mut snap);
1104        // First channel updated, second untouched
1105        assert_eq!(snap.channels[0], (12345, 99));
1106        assert_eq!(snap.channels[1], (67890, 5));
1107    }
1108
1109    #[test]
1110    fn apply_update_state_via_backend() {
1111        let b = InMemoryBackend::new();
1112        b.apply_update_state(UpdateStateChange::Primary {
1113            pts: 7,
1114            date: 8,
1115            seq: 9,
1116        })
1117        .unwrap();
1118        let s = b.snapshot().unwrap();
1119        assert_eq!(s.updates_state.pts, 7);
1120    }
1121
1122    // Default impl (BinaryFileBackend trait shape via InMemory smoke)
1123
1124    #[test]
1125    fn default_update_dc_via_trait_object() {
1126        let b: Box<dyn SessionBackend> = Box::new(InMemoryBackend::new());
1127        b.update_dc(&make_dc(1)).unwrap();
1128        b.update_dc(&make_dc(2)).unwrap();
1129        // Can't call snapshot() on trait object, but save/load must be consistent
1130        let loaded = b.load().unwrap().unwrap();
1131        assert_eq!(loaded.dcs.len(), 2);
1132    }
1133
1134    // IPv6 tests
1135
1136    fn make_dc_v6(id: i32) -> DcEntry {
1137        DcEntry {
1138            dc_id: id,
1139            addr: format!("[2001:b28:f23d:f00{}::a]:443", id),
1140            auth_key: None,
1141            first_salt: 0,
1142            time_offset: 0,
1143            flags: DcFlags::IPV6,
1144        }
1145    }
1146
1147    #[test]
1148    fn dc_entry_from_parts_ipv4() {
1149        let dc = DcEntry::from_parts(1, "149.154.175.53", 443, DcFlags::NONE);
1150        assert_eq!(dc.addr, "149.154.175.53:443");
1151        assert!(!dc.is_ipv6());
1152        let sa = dc.socket_addr().unwrap();
1153        assert_eq!(sa.port(), 443);
1154    }
1155
1156    #[test]
1157    fn dc_entry_from_parts_ipv6() {
1158        let dc = DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6);
1159        assert_eq!(dc.addr, "[2001:b28:f23d:f001::a]:443");
1160        assert!(dc.is_ipv6());
1161        let sa = dc.socket_addr().unwrap();
1162        assert_eq!(sa.port(), 443);
1163    }
1164
1165    #[test]
1166    fn persisted_session_dc_for_prefers_ipv6() {
1167        let mut s = PersistedSession::default();
1168        s.dcs.push(make_dc(2)); // IPv4
1169        s.dcs.push(make_dc_v6(2)); // IPv6
1170
1171        let v6 = s.dc_for(2, true).unwrap();
1172        assert!(v6.is_ipv6());
1173
1174        let v4 = s.dc_for(2, false).unwrap();
1175        assert!(!v4.is_ipv6());
1176    }
1177
1178    #[test]
1179    fn persisted_session_dc_for_falls_back_when_only_ipv4() {
1180        let mut s = PersistedSession::default();
1181        s.dcs.push(make_dc(3)); // IPv4 only
1182
1183        // Asking for IPv6 should fall back to IPv4
1184        let dc = s.dc_for(3, true).unwrap();
1185        assert!(!dc.is_ipv6());
1186    }
1187
1188    #[test]
1189    fn persisted_session_all_dcs_for_returns_both() {
1190        let mut s = PersistedSession::default();
1191        s.dcs.push(make_dc(1));
1192        s.dcs.push(make_dc_v6(1));
1193        s.dcs.push(make_dc(2));
1194
1195        assert_eq!(s.all_dcs_for(1).count(), 2);
1196        assert_eq!(s.all_dcs_for(2).count(), 1);
1197        assert_eq!(s.all_dcs_for(5).count(), 0);
1198    }
1199
1200    #[test]
1201    fn inmemory_ipv4_and_ipv6_coexist() {
1202        let b = InMemoryBackend::new();
1203        b.update_dc(&make_dc(2)).unwrap(); // IPv4
1204        b.update_dc(&make_dc_v6(2)).unwrap(); // IPv6
1205
1206        let s = b.snapshot().unwrap();
1207        // Both entries must survive they have different flags
1208        assert_eq!(s.dcs.iter().filter(|d| d.dc_id == 2).count(), 2);
1209    }
1210
1211    #[test]
1212    fn binary_roundtrip_ipv4_and_ipv6() {
1213        let mut s = PersistedSession::default();
1214        s.home_dc_id = 2;
1215        s.dcs.push(make_dc(2));
1216        s.dcs.push(make_dc_v6(2));
1217
1218        let bytes = s.to_bytes();
1219        let loaded = PersistedSession::from_bytes(&bytes).unwrap();
1220        assert_eq!(loaded.dcs.len(), 2);
1221        assert_eq!(loaded.dcs.iter().filter(|d| d.is_ipv6()).count(), 1);
1222        assert_eq!(loaded.dcs.iter().filter(|d| !d.is_ipv6()).count(), 1);
1223    }
1224}
1225
1226// SqliteBackend
1227
1228/// SQLite-backed session (via `rusqlite`).
1229///
1230/// Enabled with the `sqlite-session` Cargo feature.
1231///
1232/// # Schema
1233///
1234/// Six tables are created on first open (idempotent):
1235///
1236/// | Table          | Purpose                                          |
1237/// |----------------|--------------------------------------------------|
1238/// | `meta`         | `home_dc_id` and future scalar values            |
1239/// | `dcs`          | One row per DC (auth key, address, flags, ...)     |
1240/// | `update_state` | Single-row pts / qts / date / seq                |
1241/// | `channel_pts`  | Per-channel pts                                  |
1242/// | `peers`        | Access-hash cache (includes is_chat flag)        |
1243/// | `min_peers`    | Min-user message contexts                        |
1244///
1245/// # Granular writes
1246///
1247/// All [`SessionBackend`] extension methods (`update_dc`, `set_home_dc`,
1248/// `apply_update_state`, `cache_peer`) issue **single-row SQL statements**
1249/// instead of the default load-mutate-save round-trip, so they are safe to
1250/// call frequently (e.g. on every update batch) without performance concerns.
1251#[cfg(feature = "sqlite-session")]
1252pub struct SqliteBackend {
1253    conn: std::sync::Mutex<rusqlite::Connection>,
1254    label: String,
1255}
1256
1257#[cfg(feature = "sqlite-session")]
1258impl SqliteBackend {
1259    const SCHEMA: &'static str = "
1260        PRAGMA journal_mode = WAL;
1261        PRAGMA synchronous  = NORMAL;
1262
1263        CREATE TABLE IF NOT EXISTS meta (
1264            key   TEXT    PRIMARY KEY,
1265            value INTEGER NOT NULL DEFAULT 0
1266        );
1267
1268        CREATE TABLE IF NOT EXISTS dcs (
1269            dc_id       INTEGER NOT NULL,
1270            flags       INTEGER NOT NULL DEFAULT 0,
1271            addr        TEXT    NOT NULL,
1272            auth_key    BLOB,
1273            first_salt  INTEGER NOT NULL DEFAULT 0,
1274            time_offset INTEGER NOT NULL DEFAULT 0,
1275            PRIMARY KEY (dc_id, flags)
1276        );
1277
1278        CREATE TABLE IF NOT EXISTS update_state (
1279            id   INTEGER PRIMARY KEY CHECK (id = 1),
1280            pts  INTEGER NOT NULL DEFAULT 0,
1281            qts  INTEGER NOT NULL DEFAULT 0,
1282            date INTEGER NOT NULL DEFAULT 0,
1283            seq  INTEGER NOT NULL DEFAULT 0
1284        );
1285
1286        CREATE TABLE IF NOT EXISTS channel_pts (
1287            channel_id INTEGER PRIMARY KEY,
1288            pts        INTEGER NOT NULL
1289        );
1290
1291        CREATE TABLE IF NOT EXISTS peers (
1292            id           INTEGER PRIMARY KEY,
1293            access_hash  INTEGER NOT NULL,
1294            is_channel   INTEGER NOT NULL DEFAULT 0,
1295            is_chat      INTEGER NOT NULL DEFAULT 0
1296        );
1297
1298        CREATE TABLE IF NOT EXISTS min_peers (
1299            user_id INTEGER PRIMARY KEY,
1300            peer_id INTEGER NOT NULL,
1301            msg_id  INTEGER NOT NULL
1302        );
1303    ";
1304
1305    /// Open (or create) the SQLite database at `path`.
1306    pub fn open(path: impl Into<PathBuf>) -> io::Result<Self> {
1307        let path = path.into();
1308        let label = path.display().to_string();
1309        let conn = rusqlite::Connection::open(&path).map_err(io::Error::other)?;
1310        conn.execute_batch(Self::SCHEMA).map_err(io::Error::other)?;
1311        Self::migrate_legacy_sqlite_schema(&conn)?;
1312        Ok(Self {
1313            conn: std::sync::Mutex::new(conn),
1314            label,
1315        })
1316    }
1317
1318    /// Open an in-process SQLite database (useful for tests).
1319    pub fn in_memory() -> io::Result<Self> {
1320        let conn = rusqlite::Connection::open_in_memory().map_err(io::Error::other)?;
1321        conn.execute_batch(Self::SCHEMA).map_err(io::Error::other)?;
1322        Self::migrate_legacy_sqlite_schema(&conn)?;
1323        Ok(Self {
1324            conn: std::sync::Mutex::new(conn),
1325            label: ":memory:".into(),
1326        })
1327    }
1328
1329    fn map_err(e: rusqlite::Error) -> io::Error {
1330        io::Error::other(e)
1331    }
1332
1333    /// Migrate an older database that is missing the is_chat column or the
1334    /// min_peers table. Safe to call on a fresh database; both operations
1335    /// are no-ops if the schema is already current.
1336    fn migrate_legacy_sqlite_schema(conn: &rusqlite::Connection) -> io::Result<()> {
1337        let mut has_is_chat = false;
1338        let mut stmt = conn
1339            .prepare("PRAGMA table_info(peers)")
1340            .map_err(Self::map_err)?;
1341        let cols = stmt
1342            .query_map([], |row| row.get::<_, String>(1))
1343            .map_err(Self::map_err)?;
1344        for col in cols.filter_map(|r| r.ok()) {
1345            if col == "is_chat" {
1346                has_is_chat = true;
1347                break;
1348            }
1349        }
1350        if !has_is_chat {
1351            conn.execute_batch("ALTER TABLE peers ADD COLUMN is_chat INTEGER NOT NULL DEFAULT 0;")
1352                .map_err(Self::map_err)?;
1353        }
1354        conn.execute_batch(
1355            "CREATE TABLE IF NOT EXISTS min_peers (
1356                user_id INTEGER PRIMARY KEY,
1357                peer_id INTEGER NOT NULL,
1358                msg_id  INTEGER NOT NULL
1359            );",
1360        )
1361        .map_err(Self::map_err)?;
1362        Ok(())
1363    }
1364
1365    /// Read the full session out of the database.
1366    fn read_session(conn: &rusqlite::Connection) -> io::Result<PersistedSession> {
1367        // home_dc_id
1368        let home_dc_id: i32 = conn
1369            .query_row("SELECT value FROM meta WHERE key = 'home_dc_id'", [], |r| {
1370                r.get(0)
1371            })
1372            .unwrap_or(0);
1373
1374        // dcs
1375        let mut stmt = conn
1376            .prepare("SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs")
1377            .map_err(Self::map_err)?;
1378        let dcs = stmt
1379            .query_map([], |row| {
1380                let dc_id: i32 = row.get(0)?;
1381                let flags_raw: u8 = row.get(1)?;
1382                let addr: String = row.get(2)?;
1383                let key_blob: Option<Vec<u8>> = row.get(3)?;
1384                let first_salt: i64 = row.get(4)?;
1385                let time_offset: i32 = row.get(5)?;
1386                Ok((dc_id, addr, key_blob, first_salt, time_offset, flags_raw))
1387            })
1388            .map_err(Self::map_err)?
1389            .filter_map(|r| r.ok())
1390            .map(
1391                |(dc_id, addr, key_blob, first_salt, time_offset, flags_raw)| {
1392                    let auth_key = key_blob.and_then(|b| {
1393                        if b.len() == 256 {
1394                            let mut k = [0u8; 256];
1395                            k.copy_from_slice(&b);
1396                            Some(k)
1397                        } else {
1398                            None
1399                        }
1400                    });
1401                    DcEntry {
1402                        dc_id,
1403                        addr,
1404                        auth_key,
1405                        first_salt,
1406                        time_offset,
1407                        flags: DcFlags(flags_raw),
1408                    }
1409                },
1410            )
1411            .collect();
1412
1413        // update_state
1414        let updates_state = conn
1415            .query_row(
1416                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1417                [],
1418                |r| {
1419                    Ok(UpdatesStateSnap {
1420                        pts: r.get(0)?,
1421                        qts: r.get(1)?,
1422                        date: r.get(2)?,
1423                        seq: r.get(3)?,
1424                        channels: vec![],
1425                    })
1426                },
1427            )
1428            .unwrap_or_default();
1429
1430        // channel_pts
1431        let mut ch_stmt = conn
1432            .prepare("SELECT channel_id, pts FROM channel_pts")
1433            .map_err(Self::map_err)?;
1434        let channels: Vec<(i64, i32)> = ch_stmt
1435            .query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i32>(1)?)))
1436            .map_err(Self::map_err)?
1437            .filter_map(|r| r.ok())
1438            .collect();
1439
1440        // peers
1441        let mut peer_stmt = conn
1442            .prepare("SELECT id, access_hash, is_channel, is_chat FROM peers")
1443            .map_err(Self::map_err)?;
1444        let peers: Vec<CachedPeer> = peer_stmt
1445            .query_map([], |r| {
1446                Ok(CachedPeer {
1447                    id: r.get(0)?,
1448                    access_hash: r.get(1)?,
1449                    is_channel: r.get::<_, i32>(2)? != 0,
1450                    is_chat: r.get::<_, i32>(3)? != 0,
1451                })
1452            })
1453            .map_err(Self::map_err)?
1454            .filter_map(|r| r.ok())
1455            .collect();
1456
1457        // min_peers
1458        let mut min_stmt = conn
1459            .prepare("SELECT user_id, peer_id, msg_id FROM min_peers")
1460            .map_err(Self::map_err)?;
1461        let min_peers: Vec<CachedMinPeer> = min_stmt
1462            .query_map([], |r| {
1463                Ok(CachedMinPeer {
1464                    user_id: r.get(0)?,
1465                    peer_id: r.get(1)?,
1466                    msg_id: r.get(2)?,
1467                })
1468            })
1469            .map_err(Self::map_err)?
1470            .filter_map(|r| r.ok())
1471            .collect();
1472
1473        Ok(PersistedSession {
1474            home_dc_id,
1475            dcs,
1476            updates_state: UpdatesStateSnap {
1477                channels,
1478                ..updates_state
1479            },
1480            peers,
1481            min_peers,
1482        })
1483    }
1484
1485    /// Write the full session into the database inside a single transaction.
1486    fn write_session(conn: &rusqlite::Connection, s: &PersistedSession) -> io::Result<()> {
1487        conn.execute_batch("BEGIN IMMEDIATE")
1488            .map_err(Self::map_err)?;
1489
1490        conn.execute(
1491            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1492             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1493            rusqlite::params![s.home_dc_id],
1494        )
1495        .map_err(Self::map_err)?;
1496
1497        // Replace all DCs
1498        conn.execute("DELETE FROM dcs", []).map_err(Self::map_err)?;
1499        for d in &s.dcs {
1500            conn.execute(
1501                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1502                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1503                rusqlite::params![
1504                    d.dc_id,
1505                    d.flags.0,
1506                    d.addr,
1507                    d.auth_key.as_ref().map(|k| k.as_ref()),
1508                    d.first_salt,
1509                    d.time_offset,
1510                ],
1511            )
1512            .map_err(Self::map_err)?;
1513        }
1514
1515        // update_state  pts and qts are monotonic: write_session() must never
1516        // move them backwards. MAX() ensures a stale snapshot cannot overwrite
1517        // a fresher value committed by apply_update_state().
1518        let us = &s.updates_state;
1519        conn.execute(
1520            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1, ?1, ?2, ?3, ?4)
1521             ON CONFLICT(id) DO UPDATE SET
1522               pts  = MAX(excluded.pts,  update_state.pts),
1523               qts  = MAX(excluded.qts,  update_state.qts),
1524               date = excluded.date,
1525               seq  = excluded.seq",
1526            rusqlite::params![us.pts, us.qts, us.date, us.seq],
1527        )
1528        .map_err(Self::map_err)?;
1529
1530        conn.execute("DELETE FROM channel_pts", [])
1531            .map_err(Self::map_err)?;
1532        for &(cid, cpts) in &us.channels {
1533            conn.execute(
1534                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
1535                rusqlite::params![cid, cpts],
1536            )
1537            .map_err(Self::map_err)?;
1538        }
1539
1540        // peers
1541        conn.execute("DELETE FROM peers", [])
1542            .map_err(Self::map_err)?;
1543        for p in &s.peers {
1544            conn.execute(
1545                "INSERT INTO peers (id, access_hash, is_channel, is_chat) VALUES (?1, ?2, ?3, ?4)",
1546                rusqlite::params![p.id, p.access_hash, p.is_channel as i32, p.is_chat as i32],
1547            )
1548            .map_err(Self::map_err)?;
1549        }
1550
1551        // min_peers
1552        conn.execute("DELETE FROM min_peers", [])
1553            .map_err(Self::map_err)?;
1554        for m in &s.min_peers {
1555            conn.execute(
1556                "INSERT INTO min_peers (user_id, peer_id, msg_id) VALUES (?1, ?2, ?3)",
1557                rusqlite::params![m.user_id, m.peer_id, m.msg_id],
1558            )
1559            .map_err(Self::map_err)?;
1560        }
1561
1562        conn.execute_batch("COMMIT").map_err(Self::map_err)
1563    }
1564}
1565
1566#[cfg(feature = "sqlite-session")]
1567impl SessionBackend for SqliteBackend {
1568    fn save(&self, session: &PersistedSession) -> io::Result<()> {
1569        let conn = self.conn.lock().unwrap();
1570        Self::write_session(&conn, session)
1571    }
1572
1573    fn load(&self) -> io::Result<Option<PersistedSession>> {
1574        let conn = self.conn.lock().unwrap();
1575        // If meta table is empty, no session has been saved yet.
1576        let count: i64 = conn
1577            .query_row("SELECT COUNT(*) FROM meta", [], |r| r.get(0))
1578            .map_err(Self::map_err)?;
1579        if count == 0 {
1580            return Ok(None);
1581        }
1582        Self::read_session(&conn).map(Some)
1583    }
1584
1585    fn delete(&self) -> io::Result<()> {
1586        let conn = self.conn.lock().unwrap();
1587        conn.execute_batch(
1588            "BEGIN IMMEDIATE;
1589             DELETE FROM meta;
1590             DELETE FROM dcs;
1591             DELETE FROM update_state;
1592             DELETE FROM channel_pts;
1593             DELETE FROM peers;
1594             DELETE FROM min_peers;
1595             COMMIT;",
1596        )
1597        .map_err(Self::map_err)
1598    }
1599
1600    fn name(&self) -> &str {
1601        &self.label
1602    }
1603
1604    // Granular overrides (single-row SQL, no full round-trip)
1605
1606    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
1607        let conn = self.conn.lock().unwrap();
1608        conn.execute(
1609            "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1610             VALUES (?1, ?6, ?2, ?3, ?4, ?5)
1611             ON CONFLICT(dc_id, flags) DO UPDATE SET
1612               addr        = excluded.addr,
1613               auth_key    = excluded.auth_key,
1614               first_salt  = excluded.first_salt,
1615               time_offset = excluded.time_offset",
1616            rusqlite::params![
1617                entry.dc_id,
1618                entry.addr,
1619                entry.auth_key.as_ref().map(|k| k.as_ref()),
1620                entry.first_salt,
1621                entry.time_offset,
1622                entry.flags.0,
1623            ],
1624        )
1625        .map(|_| ())
1626        .map_err(Self::map_err)
1627    }
1628
1629    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
1630        let conn = self.conn.lock().unwrap();
1631        conn.execute(
1632            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1633             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1634            rusqlite::params![dc_id],
1635        )
1636        .map(|_| ())
1637        .map_err(Self::map_err)
1638    }
1639
1640    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
1641        let conn = self.conn.lock().unwrap();
1642        match update {
1643            UpdateStateChange::All(snap) => {
1644                conn.execute(
1645                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1646                     ON CONFLICT(id) DO UPDATE SET
1647                       pts=excluded.pts, qts=excluded.qts,
1648                       date=excluded.date, seq=excluded.seq",
1649                    rusqlite::params![snap.pts, snap.qts, snap.date, snap.seq],
1650                )
1651                .map_err(Self::map_err)?;
1652                conn.execute("DELETE FROM channel_pts", [])
1653                    .map_err(Self::map_err)?;
1654                for &(cid, cpts) in &snap.channels {
1655                    conn.execute(
1656                        "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
1657                        rusqlite::params![cid, cpts],
1658                    )
1659                    .map_err(Self::map_err)?;
1660                }
1661                Ok(())
1662            }
1663            UpdateStateChange::Primary { pts, date, seq } => conn
1664                .execute(
1665                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,0,?2,?3)
1666                     ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, date=excluded.date,
1667                     seq=excluded.seq",
1668                    rusqlite::params![pts, date, seq],
1669                )
1670                .map(|_| ())
1671                .map_err(Self::map_err),
1672            UpdateStateChange::Secondary { qts } => conn
1673                .execute(
1674                    "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,0,?1,0,0)
1675                     ON CONFLICT(id) DO UPDATE SET qts = excluded.qts",
1676                    rusqlite::params![qts],
1677                )
1678                .map(|_| ())
1679                .map_err(Self::map_err),
1680            UpdateStateChange::Channel { id, pts } => conn
1681                .execute(
1682                    "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)
1683                     ON CONFLICT(channel_id) DO UPDATE SET pts = excluded.pts",
1684                    rusqlite::params![id, pts],
1685                )
1686                .map(|_| ())
1687                .map_err(Self::map_err),
1688        }
1689    }
1690
1691    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1692        let conn = self.conn.lock().unwrap();
1693        conn.execute(
1694            "INSERT INTO peers (id, access_hash, is_channel, is_chat) VALUES (?1, ?2, ?3, ?4)
1695             ON CONFLICT(id) DO UPDATE SET
1696               access_hash = excluded.access_hash,
1697               is_channel  = excluded.is_channel,
1698               is_chat     = excluded.is_chat",
1699            rusqlite::params![
1700                peer.id,
1701                peer.access_hash,
1702                peer.is_channel as i32,
1703                peer.is_chat as i32
1704            ],
1705        )
1706        .map(|_| ())
1707        .map_err(Self::map_err)
1708    }
1709}
1710
1711// LibSqlBackend
1712
1713/// libSQL-backed session (Turso / embedded replica / in-process).
1714///
1715/// Enabled with the `libsql-session` Cargo feature.
1716///
1717/// The libSQL API is async; since [`SessionBackend`] methods are sync we
1718/// block via `tokio::runtime::Handle::current().block_on(...)`.  Always
1719/// call from inside a Tokio runtime (i.e. the same runtime as the rest of
1720/// `ferogram`).
1721///
1722/// # Connecting
1723///
1724/// | Mode              | Constructor                        |
1725/// |-------------------|------------------------------------|
1726/// | Local file        | `LibSqlBackend::open_local(path)`  |
1727/// | In-memory         | `LibSqlBackend::in_memory()`       |
1728/// | Turso remote      | `LibSqlBackend::open_remote(url, token)` |
1729/// | Embedded replica  | `LibSqlBackend::open_replica(path, url, token)` |
1730#[cfg(feature = "libsql-session")]
1731pub struct LibSqlBackend {
1732    conn: libsql::Connection,
1733    label: String,
1734}
1735
1736#[cfg(feature = "libsql-session")]
1737impl LibSqlBackend {
1738    const SCHEMA: &'static str = "
1739        CREATE TABLE IF NOT EXISTS meta (
1740            key   TEXT    PRIMARY KEY,
1741            value INTEGER NOT NULL DEFAULT 0
1742        );
1743        CREATE TABLE IF NOT EXISTS dcs (
1744            dc_id       INTEGER NOT NULL,
1745            flags       INTEGER NOT NULL DEFAULT 0,
1746            addr        TEXT    NOT NULL,
1747            auth_key    BLOB,
1748            first_salt  INTEGER NOT NULL DEFAULT 0,
1749            time_offset INTEGER NOT NULL DEFAULT 0,
1750            PRIMARY KEY (dc_id, flags)
1751        );
1752        CREATE TABLE IF NOT EXISTS update_state (
1753            id   INTEGER PRIMARY KEY CHECK (id = 1),
1754            pts  INTEGER NOT NULL DEFAULT 0,
1755            qts  INTEGER NOT NULL DEFAULT 0,
1756            date INTEGER NOT NULL DEFAULT 0,
1757            seq  INTEGER NOT NULL DEFAULT 0
1758        );
1759        CREATE TABLE IF NOT EXISTS channel_pts (
1760            channel_id INTEGER PRIMARY KEY,
1761            pts        INTEGER NOT NULL
1762        );
1763        CREATE TABLE IF NOT EXISTS peers (
1764            id          INTEGER PRIMARY KEY,
1765            access_hash INTEGER NOT NULL,
1766            is_channel  INTEGER NOT NULL DEFAULT 0,
1767            is_chat     INTEGER NOT NULL DEFAULT 0
1768        );
1769        CREATE TABLE IF NOT EXISTS min_peers (
1770            user_id INTEGER PRIMARY KEY,
1771            peer_id INTEGER NOT NULL,
1772            msg_id  INTEGER NOT NULL
1773        );
1774    ";
1775
1776    fn block<F, T>(fut: F) -> io::Result<T>
1777    where
1778        F: std::future::Future<Output = Result<T, libsql::Error>>,
1779    {
1780        tokio::runtime::Handle::current()
1781            .block_on(fut)
1782            .map_err(io::Error::other)
1783    }
1784
1785    async fn apply_schema(conn: &libsql::Connection) -> Result<(), libsql::Error> {
1786        conn.execute_batch(Self::SCHEMA).await
1787    }
1788
1789    /// Open a local file database.
1790    pub fn open_local(path: impl Into<PathBuf>) -> io::Result<Self> {
1791        let path = path.into();
1792        let label = path.display().to_string();
1793        let db = Self::block(async { libsql::Builder::new_local(path).build().await })?;
1794        let conn = Self::block(async { db.connect() }).map_err(io::Error::other)?;
1795        Self::block(Self::apply_schema(&conn))?;
1796        Ok(Self {
1797            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1798            label,
1799        })
1800    }
1801
1802    /// Open an in-process in-memory database (useful for tests).
1803    pub fn in_memory() -> io::Result<Self> {
1804        let db = Self::block(async { libsql::Builder::new_local(":memory:").build().await })?;
1805        let conn = Self::block(async { db.connect() }).map_err(io::Error::other)?;
1806        Self::block(Self::apply_schema(&conn))?;
1807        Ok(Self {
1808            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1809            label: ":memory:".into(),
1810        })
1811    }
1812
1813    /// Connect to a remote Turso database.
1814    pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> io::Result<Self> {
1815        let url = url.into();
1816        let label = url.clone();
1817        let db = Self::block(async {
1818            libsql::Builder::new_remote(url, auth_token.into())
1819                .build()
1820                .await
1821        })?;
1822        let conn = Self::block(async { db.connect() }).map_err(io::Error::other)?;
1823        Self::block(Self::apply_schema(&conn))?;
1824        Ok(Self {
1825            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1826            label,
1827        })
1828    }
1829
1830    /// Open an embedded replica (local file + Turso remote sync).
1831    pub fn open_replica(
1832        path: impl Into<PathBuf>,
1833        url: impl Into<String>,
1834        auth_token: impl Into<String>,
1835    ) -> io::Result<Self> {
1836        let path = path.into();
1837        let label = format!("{} (replica of {})", path.display(), url.into());
1838        let db = Self::block(async {
1839            libsql::Builder::new_remote_replica(path, url.into(), auth_token.into())
1840                .build()
1841                .await
1842        })?;
1843        let conn = Self::block(async { db.connect() }).map_err(io::Error::other)?;
1844        Self::block(Self::apply_schema(&conn))?;
1845        Ok(Self {
1846            conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1847            label,
1848        })
1849    }
1850
1851    async fn read_session_async(
1852        conn: &libsql::Connection,
1853    ) -> Result<PersistedSession, libsql::Error> {
1854        use libsql::de;
1855
1856        // home_dc_id
1857        let home_dc_id: i32 = conn
1858            .query("SELECT value FROM meta WHERE key = 'home_dc_id'", ())
1859            .await?
1860            .next()
1861            .await?
1862            .map(|r| r.get::<i32>(0))
1863            .transpose()?
1864            .unwrap_or(0);
1865
1866        // dcs
1867        let mut rows = conn
1868            .query(
1869                "SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs",
1870                (),
1871            )
1872            .await?;
1873        let mut dcs = Vec::new();
1874        while let Some(row) = rows.next().await? {
1875            let dc_id: i32 = row.get(0)?;
1876            let flags_raw: u8 = row.get::<i64>(1)? as u8;
1877            let addr: String = row.get(2)?;
1878            let key_blob: Option<Vec<u8>> = row.get(3)?;
1879            let first_salt: i64 = row.get(4)?;
1880            let time_offset: i32 = row.get(5)?;
1881            let auth_key = match key_blob {
1882                Some(b) if b.len() == 256 => {
1883                    let mut k = [0u8; 256];
1884                    k.copy_from_slice(&b);
1885                    Some(k)
1886                }
1887                Some(b) => {
1888                    return Err(libsql::Error::Misuse(format!(
1889                        "auth_key blob must be 256 bytes, got {}",
1890                        b.len()
1891                    )));
1892                }
1893                None => None,
1894            };
1895            dcs.push(DcEntry {
1896                dc_id,
1897                addr,
1898                auth_key,
1899                first_salt,
1900                time_offset,
1901                flags: DcFlags(flags_raw),
1902            });
1903        }
1904
1905        // update_state
1906        let mut us_row = conn
1907            .query(
1908                "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1909                (),
1910            )
1911            .await?;
1912        let updates_state = if let Some(r) = us_row.next().await? {
1913            UpdatesStateSnap {
1914                pts: r.get(0)?,
1915                qts: r.get(1)?,
1916                date: r.get(2)?,
1917                seq: r.get(3)?,
1918                channels: vec![],
1919            }
1920        } else {
1921            UpdatesStateSnap::default()
1922        };
1923
1924        // channel_pts
1925        let mut ch_rows = conn
1926            .query("SELECT channel_id, pts FROM channel_pts", ())
1927            .await?;
1928        let mut channels = Vec::new();
1929        while let Some(r) = ch_rows.next().await? {
1930            channels.push((r.get::<i64>(0)?, r.get::<i32>(1)?));
1931        }
1932
1933        // peers
1934        let mut peer_rows = conn
1935            .query("SELECT id, access_hash, is_channel, is_chat FROM peers", ())
1936            .await?;
1937        let mut peers = Vec::new();
1938        while let Some(r) = peer_rows.next().await? {
1939            peers.push(CachedPeer {
1940                id: r.get(0)?,
1941                access_hash: r.get(1)?,
1942                is_channel: r.get::<i32>(2)? != 0,
1943                is_chat: r.get::<i32>(3)? != 0,
1944            });
1945        }
1946
1947        // min_peers
1948        let mut min_rows = conn
1949            .query("SELECT user_id, peer_id, msg_id FROM min_peers", ())
1950            .await?;
1951        let mut min_peers = Vec::new();
1952        while let Some(r) = min_rows.next().await? {
1953            min_peers.push(CachedMinPeer {
1954                user_id: r.get(0)?,
1955                peer_id: r.get(1)?,
1956                msg_id: r.get(2)?,
1957            });
1958        }
1959
1960        Ok(PersistedSession {
1961            home_dc_id,
1962            dcs,
1963            updates_state: UpdatesStateSnap {
1964                channels,
1965                ..updates_state
1966            },
1967            peers,
1968            min_peers,
1969        })
1970    }
1971
1972    async fn write_session_async(
1973        conn: &libsql::Connection,
1974        s: &PersistedSession,
1975    ) -> Result<(), libsql::Error> {
1976        conn.execute_batch("BEGIN IMMEDIATE").await?;
1977
1978        conn.execute(
1979            "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1980             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1981            libsql::params![s.home_dc_id],
1982        )
1983        .await?;
1984
1985        conn.execute("DELETE FROM dcs", ()).await?;
1986        for d in &s.dcs {
1987            conn.execute(
1988                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1989                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1990                libsql::params![
1991                    d.dc_id,
1992                    d.flags.0 as i64,
1993                    d.addr.clone(),
1994                    d.auth_key.map(|k| k.to_vec()),
1995                    d.first_salt,
1996                    d.time_offset,
1997                ],
1998            )
1999            .await?;
2000        }
2001
2002        let us = &s.updates_state;
2003        conn.execute(
2004            "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
2005             ON CONFLICT(id) DO UPDATE SET
2006               pts  = MAX(excluded.pts,  update_state.pts),
2007               qts  = MAX(excluded.qts,  update_state.qts),
2008               date = excluded.date,
2009               seq  = excluded.seq",
2010            libsql::params![us.pts, us.qts, us.date, us.seq],
2011        )
2012        .await?;
2013
2014        conn.execute("DELETE FROM channel_pts", ()).await?;
2015        for &(cid, cpts) in &us.channels {
2016            conn.execute(
2017                "INSERT INTO channel_pts (channel_id, pts) VALUES (?1,?2)",
2018                libsql::params![cid, cpts],
2019            )
2020            .await?;
2021        }
2022
2023        conn.execute("DELETE FROM peers", ()).await?;
2024        for p in &s.peers {
2025            conn.execute(
2026                "INSERT INTO peers (id, access_hash, is_channel, is_chat) VALUES (?1,?2,?3,?4)",
2027                libsql::params![p.id, p.access_hash, p.is_channel as i32, p.is_chat as i32],
2028            )
2029            .await?;
2030        }
2031
2032        conn.execute("DELETE FROM min_peers", ()).await?;
2033        for m in &s.min_peers {
2034            conn.execute(
2035                "INSERT INTO min_peers (user_id, peer_id, msg_id) VALUES (?1,?2,?3)",
2036                libsql::params![m.user_id, m.peer_id, m.msg_id],
2037            )
2038            .await?;
2039        }
2040
2041        conn.execute_batch("COMMIT").await
2042    }
2043}
2044
2045#[cfg(feature = "libsql-session")]
2046impl SessionBackend for LibSqlBackend {
2047    fn save(&self, session: &PersistedSession) -> io::Result<()> {
2048        let conn = self.conn.clone();
2049        let session = session.clone();
2050        Self::block(async move {
2051            let conn = conn.lock().await;
2052            Self::write_session_async(&conn, session).await
2053        })
2054    }
2055
2056    fn load(&self) -> io::Result<Option<PersistedSession>> {
2057        let conn = self.conn.clone();
2058        let count: i64 = Self::block(async move {
2059            let conn = conn.lock().await;
2060            let mut rows = conn.query("SELECT COUNT(*) FROM meta", ()).await?;
2061            Ok::<i64, libsql::Error>(rows.next().await?.and_then(|r| r.get(0).ok()).unwrap_or(0))
2062        })?;
2063        if count == 0 {
2064            return Ok(None);
2065        }
2066        let conn = self.conn.clone();
2067        Self::block(async move {
2068            let conn = conn.lock().await;
2069            Self::read_session_async(&conn).await
2070        })
2071        .map(Some)
2072    }
2073
2074    fn delete(&self) -> io::Result<()> {
2075        let conn = self.conn.clone();
2076        Self::block(async move {
2077            let conn = conn.lock().await;
2078            conn.execute_batch(
2079                "BEGIN IMMEDIATE;
2080                 DELETE FROM meta;
2081                 DELETE FROM dcs;
2082                 DELETE FROM update_state;
2083                 DELETE FROM channel_pts;
2084                 DELETE FROM peers;
2085                 DELETE FROM min_peers;
2086                 COMMIT;",
2087            )
2088            .await
2089        })
2090    }
2091
2092    fn name(&self) -> &str {
2093        &self.label
2094    }
2095
2096    // Granular overrides
2097
2098    fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
2099        let conn = self.conn.clone();
2100        let (dc_id, addr, key, salt, off, flags) = (
2101            entry.dc_id,
2102            entry.addr.clone(),
2103            entry.auth_key.map(|k| k.to_vec()),
2104            entry.first_salt,
2105            entry.time_offset,
2106            entry.flags.0 as i64,
2107        );
2108        Self::block(async move {
2109            let conn = conn.lock().await;
2110            conn.execute(
2111                "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
2112                 VALUES (?1,?6,?2,?3,?4,?5)
2113                 ON CONFLICT(dc_id, flags) DO UPDATE SET
2114                   addr=excluded.addr, auth_key=excluded.auth_key,
2115                   first_salt=excluded.first_salt, time_offset=excluded.time_offset",
2116                libsql::params![dc_id, addr, key, salt, off, flags],
2117            )
2118            .await
2119            .map(|_| ())
2120        })
2121    }
2122
2123    fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
2124        let conn = self.conn.clone();
2125        Self::block(async move {
2126            let conn = conn.lock().await;
2127            conn.execute(
2128                "INSERT INTO meta (key, value) VALUES ('home_dc_id',?1)
2129                 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
2130                libsql::params![dc_id],
2131            )
2132            .await
2133            .map(|_| ())
2134        })
2135    }
2136
2137    fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
2138        let conn = self.conn.clone();
2139        Self::block(async move {
2140            let conn = conn.lock().await;
2141            match update {
2142                UpdateStateChange::All(snap) => {
2143                    conn.execute(
2144                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,?2,?3,?4)
2145                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
2146                         date=excluded.date,seq=excluded.seq",
2147                        libsql::params![snap.pts, snap.qts, snap.date, snap.seq],
2148                    )
2149                    .await?;
2150                    conn.execute("DELETE FROM channel_pts", ()).await?;
2151                    for &(cid, cpts) in &snap.channels {
2152                        conn.execute(
2153                            "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)",
2154                            libsql::params![cid, cpts],
2155                        )
2156                        .await?;
2157                    }
2158                    Ok(())
2159                }
2160                UpdateStateChange::Primary { pts, date, seq } => conn
2161                    .execute(
2162                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,0,?2,?3)
2163                         ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,date=excluded.date,
2164                         seq=excluded.seq",
2165                        libsql::params![pts, date, seq],
2166                    )
2167                    .await
2168                    .map(|_| ()),
2169                UpdateStateChange::Secondary { qts } => conn
2170                    .execute(
2171                        "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,0,?1,0,0)
2172                         ON CONFLICT(id) DO UPDATE SET qts=excluded.qts",
2173                        libsql::params![qts],
2174                    )
2175                    .await
2176                    .map(|_| ()),
2177                UpdateStateChange::Channel { id, pts } => conn
2178                    .execute(
2179                        "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)
2180                         ON CONFLICT(channel_id) DO UPDATE SET pts=excluded.pts",
2181                        libsql::params![id, pts],
2182                    )
2183                    .await
2184                    .map(|_| ()),
2185            }
2186        })
2187    }
2188
2189    fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
2190        let conn = self.conn.clone();
2191        let (id, hash, is_ch, is_ct) = (
2192            peer.id,
2193            peer.access_hash,
2194            peer.is_channel as i32,
2195            peer.is_chat as i32,
2196        );
2197        Self::block(async move {
2198            let conn = conn.lock().await;
2199            conn.execute(
2200                "INSERT INTO peers (id,access_hash,is_channel,is_chat) VALUES (?1,?2,?3,?4)
2201                 ON CONFLICT(id) DO UPDATE SET
2202                   access_hash=excluded.access_hash,
2203                   is_channel=excluded.is_channel,
2204                   is_chat=excluded.is_chat",
2205                libsql::params![id, hash, is_ch, is_ct],
2206            )
2207            .await
2208            .map(|_| ())
2209        })
2210    }
2211}