1use std::io;
52use std::path::PathBuf;
53
54use crate::session::{CachedPeer, DcEntry, PersistedSession, UpdatesStateSnap};
55
56pub trait SessionBackend: Send + Sync {
64 fn save(&self, session: &PersistedSession) -> io::Result<()>;
65 fn load(&self) -> io::Result<Option<PersistedSession>>;
66 fn delete(&self) -> io::Result<()>;
67
68 fn name(&self) -> &str;
70
71 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
85 let mut s = self.load()?.unwrap_or_default();
86 if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
88 *existing = entry.clone();
89 } else {
90 s.dcs.push(entry.clone());
91 }
92 self.save(&s)
93 }
94
95 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
102 let mut s = self.load()?.unwrap_or_default();
103 s.home_dc_id = dc_id;
104 self.save(&s)
105 }
106
107 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
113 let mut s = self.load()?.unwrap_or_default();
114 update.apply_to(&mut s.updates_state);
115 self.save(&s)
116 }
117
118 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
125 let mut s = self.load()?.unwrap_or_default();
126 if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
127 *existing = peer.clone();
128 } else {
129 s.peers.push(peer.clone());
130 }
131 self.save(&s)
132 }
133}
134
135#[derive(Debug, Clone)]
149pub enum UpdateStateChange {
150 All(UpdatesStateSnap),
152 Primary { pts: i32, date: i32, seq: i32 },
154 Secondary { qts: i32 },
156 Channel { id: i64, pts: i32 },
158}
159
160impl UpdateStateChange {
161 pub fn apply_to(&self, snap: &mut UpdatesStateSnap) {
163 match self {
164 Self::All(new_snap) => *snap = new_snap.clone(),
165 Self::Primary { pts, date, seq } => {
166 snap.pts = *pts;
167 snap.date = *date;
168 snap.seq = *seq;
169 }
170 Self::Secondary { qts } => {
171 snap.qts = *qts;
172 }
173 Self::Channel { id, pts } => {
174 if let Some(existing) = snap.channels.iter_mut().find(|c| c.0 == *id) {
176 existing.1 = *pts;
177 } else {
178 snap.channels.push((*id, *pts));
179 }
180 }
181 }
182 }
183}
184
185pub struct BinaryFileBackend {
189 path: PathBuf,
190}
191
192impl BinaryFileBackend {
193 pub fn new(path: impl Into<PathBuf>) -> Self {
194 Self { path: path.into() }
195 }
196
197 pub fn path(&self) -> &std::path::Path {
198 &self.path
199 }
200}
201
202impl SessionBackend for BinaryFileBackend {
203 fn save(&self, session: &PersistedSession) -> io::Result<()> {
204 session.save(&self.path)
205 }
206
207 fn load(&self) -> io::Result<Option<PersistedSession>> {
208 if !self.path.exists() {
209 return Ok(None);
210 }
211 match PersistedSession::load(&self.path) {
212 Ok(s) => Ok(Some(s)),
213 Err(e) => {
214 let bak = self.path.with_extension("bak");
215 tracing::warn!(
216 "[layer] Session file {:?} is corrupt ({e}); \
217 renaming to {:?} and starting fresh",
218 self.path,
219 bak
220 );
221 let _ = std::fs::rename(&self.path, &bak);
222 Ok(None)
223 }
224 }
225 }
226
227 fn delete(&self) -> io::Result<()> {
228 if self.path.exists() {
229 std::fs::remove_file(&self.path)?;
230 }
231 Ok(())
232 }
233
234 fn name(&self) -> &str {
235 "binary-file"
236 }
237
238 }
241
242#[derive(Default)]
250pub struct InMemoryBackend {
251 data: std::sync::Mutex<Option<PersistedSession>>,
252}
253
254impl InMemoryBackend {
255 pub fn new() -> Self {
256 Self::default()
257 }
258
259 pub fn snapshot(&self) -> Option<PersistedSession> {
261 self.data.lock().unwrap().clone()
262 }
263}
264
265impl SessionBackend for InMemoryBackend {
266 fn save(&self, s: &PersistedSession) -> io::Result<()> {
267 *self.data.lock().unwrap() = Some(s.clone());
268 Ok(())
269 }
270
271 fn load(&self) -> io::Result<Option<PersistedSession>> {
272 Ok(self.data.lock().unwrap().clone())
273 }
274
275 fn delete(&self) -> io::Result<()> {
276 *self.data.lock().unwrap() = None;
277 Ok(())
278 }
279
280 fn name(&self) -> &str {
281 "in-memory"
282 }
283
284 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
287 let mut guard = self.data.lock().unwrap();
288 let s = guard.get_or_insert_with(PersistedSession::default);
289 if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
290 *existing = entry.clone();
291 } else {
292 s.dcs.push(entry.clone());
293 }
294 Ok(())
295 }
296
297 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
298 let mut guard = self.data.lock().unwrap();
299 let s = guard.get_or_insert_with(PersistedSession::default);
300 s.home_dc_id = dc_id;
301 Ok(())
302 }
303
304 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
305 let mut guard = self.data.lock().unwrap();
306 let s = guard.get_or_insert_with(PersistedSession::default);
307 update.apply_to(&mut s.updates_state);
308 Ok(())
309 }
310
311 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
312 let mut guard = self.data.lock().unwrap();
313 let s = guard.get_or_insert_with(PersistedSession::default);
314 if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
315 *existing = peer.clone();
316 } else {
317 s.peers.push(peer.clone());
318 }
319 Ok(())
320 }
321}
322
323pub struct StringSessionBackend {
327 data: std::sync::Mutex<String>,
328}
329
330impl StringSessionBackend {
331 pub fn new(s: impl Into<String>) -> Self {
332 Self {
333 data: std::sync::Mutex::new(s.into()),
334 }
335 }
336
337 pub fn current(&self) -> String {
338 self.data.lock().unwrap().clone()
339 }
340}
341
342impl SessionBackend for StringSessionBackend {
343 fn save(&self, session: &PersistedSession) -> io::Result<()> {
344 *self.data.lock().unwrap() = session.to_string();
345 Ok(())
346 }
347
348 fn load(&self) -> io::Result<Option<PersistedSession>> {
349 let s = self.data.lock().unwrap().clone();
350 if s.trim().is_empty() {
351 return Ok(None);
352 }
353 PersistedSession::from_string(&s).map(Some)
354 }
355
356 fn delete(&self) -> io::Result<()> {
357 *self.data.lock().unwrap() = String::new();
358 Ok(())
359 }
360
361 fn name(&self) -> &str {
362 "string-session"
363 }
364}
365
366#[cfg(test)]
369mod tests {
370 use super::*;
371
372 fn make_dc(id: i32) -> DcEntry {
373 DcEntry {
374 dc_id: id,
375 addr: format!("1.2.3.{id}:443"),
376 auth_key: None,
377 first_salt: 0,
378 time_offset: 0,
379 flags: DcFlags::NONE,
380 }
381 }
382
383 fn make_peer(id: i64, hash: i64) -> CachedPeer {
384 CachedPeer {
385 id,
386 access_hash: hash,
387 is_channel: false,
388 }
389 }
390
391 #[test]
394 fn inmemory_load_returns_none_when_empty() {
395 let b = InMemoryBackend::new();
396 assert!(b.load().unwrap().is_none());
397 }
398
399 #[test]
400 fn inmemory_save_then_load_round_trips() {
401 let b = InMemoryBackend::new();
402 let mut s = PersistedSession::default();
403 s.home_dc_id = 3;
404 s.dcs.push(make_dc(3));
405 b.save(&s).unwrap();
406
407 let loaded = b.load().unwrap().unwrap();
408 assert_eq!(loaded.home_dc_id, 3);
409 assert_eq!(loaded.dcs.len(), 1);
410 }
411
412 #[test]
413 fn inmemory_delete_clears_state() {
414 let b = InMemoryBackend::new();
415 let mut s = PersistedSession::default();
416 s.home_dc_id = 2;
417 b.save(&s).unwrap();
418 b.delete().unwrap();
419 assert!(b.load().unwrap().is_none());
420 }
421
422 #[test]
425 fn inmemory_update_dc_inserts_new() {
426 let b = InMemoryBackend::new();
427 b.update_dc(&make_dc(4)).unwrap();
428 let s = b.snapshot().unwrap();
429 assert_eq!(s.dcs.len(), 1);
430 assert_eq!(s.dcs[0].dc_id, 4);
431 }
432
433 #[test]
434 fn inmemory_update_dc_replaces_existing() {
435 let b = InMemoryBackend::new();
436 b.update_dc(&make_dc(2)).unwrap();
437 let mut updated = make_dc(2);
438 updated.addr = "9.9.9.9:443".to_string();
439 b.update_dc(&updated).unwrap();
440
441 let s = b.snapshot().unwrap();
442 assert_eq!(s.dcs.len(), 1);
443 assert_eq!(s.dcs[0].addr, "9.9.9.9:443");
444 }
445
446 #[test]
447 fn inmemory_set_home_dc() {
448 let b = InMemoryBackend::new();
449 b.set_home_dc(5).unwrap();
450 assert_eq!(b.snapshot().unwrap().home_dc_id, 5);
451 }
452
453 #[test]
454 fn inmemory_cache_peer_inserts() {
455 let b = InMemoryBackend::new();
456 b.cache_peer(&make_peer(100, 0xdeadbeef)).unwrap();
457 let s = b.snapshot().unwrap();
458 assert_eq!(s.peers.len(), 1);
459 assert_eq!(s.peers[0].id, 100);
460 }
461
462 #[test]
463 fn inmemory_cache_peer_updates_existing() {
464 let b = InMemoryBackend::new();
465 b.cache_peer(&make_peer(100, 111)).unwrap();
466 b.cache_peer(&make_peer(100, 222)).unwrap();
467 let s = b.snapshot().unwrap();
468 assert_eq!(s.peers.len(), 1);
469 assert_eq!(s.peers[0].access_hash, 222);
470 }
471
472 #[test]
475 fn update_state_primary() {
476 let mut snap = UpdatesStateSnap {
477 pts: 0,
478 qts: 0,
479 date: 0,
480 seq: 0,
481 channels: vec![],
482 };
483 UpdateStateChange::Primary {
484 pts: 10,
485 date: 20,
486 seq: 30,
487 }
488 .apply_to(&mut snap);
489 assert_eq!(snap.pts, 10);
490 assert_eq!(snap.date, 20);
491 assert_eq!(snap.seq, 30);
492 assert_eq!(snap.qts, 0); }
494
495 #[test]
496 fn update_state_secondary() {
497 let mut snap = UpdatesStateSnap {
498 pts: 5,
499 qts: 0,
500 date: 0,
501 seq: 0,
502 channels: vec![],
503 };
504 UpdateStateChange::Secondary { qts: 99 }.apply_to(&mut snap);
505 assert_eq!(snap.qts, 99);
506 assert_eq!(snap.pts, 5); }
508
509 #[test]
510 fn update_state_channel_inserts() {
511 let mut snap = UpdatesStateSnap {
512 pts: 0,
513 qts: 0,
514 date: 0,
515 seq: 0,
516 channels: vec![],
517 };
518 UpdateStateChange::Channel { id: 12345, pts: 42 }.apply_to(&mut snap);
519 assert_eq!(snap.channels, vec![(12345, 42)]);
520 }
521
522 #[test]
523 fn update_state_channel_updates_existing() {
524 let mut snap = UpdatesStateSnap {
525 pts: 0,
526 qts: 0,
527 date: 0,
528 seq: 0,
529 channels: vec![(12345, 10), (67890, 5)],
530 };
531 UpdateStateChange::Channel { id: 12345, pts: 99 }.apply_to(&mut snap);
532 assert_eq!(snap.channels[0], (12345, 99));
534 assert_eq!(snap.channels[1], (67890, 5));
535 }
536
537 #[test]
538 fn apply_update_state_via_backend() {
539 let b = InMemoryBackend::new();
540 b.apply_update_state(UpdateStateChange::Primary {
541 pts: 7,
542 date: 8,
543 seq: 9,
544 })
545 .unwrap();
546 let s = b.snapshot().unwrap();
547 assert_eq!(s.updates_state.pts, 7);
548 }
549
550 #[test]
553 fn default_update_dc_via_trait_object() {
554 let b: Box<dyn SessionBackend> = Box::new(InMemoryBackend::new());
555 b.update_dc(&make_dc(1)).unwrap();
556 b.update_dc(&make_dc(2)).unwrap();
557 let loaded = b.load().unwrap().unwrap();
559 assert_eq!(loaded.dcs.len(), 2);
560 }
561
562 fn make_dc_v6(id: i32) -> DcEntry {
565 DcEntry {
566 dc_id: id,
567 addr: format!("[2001:b28:f23d:f00{}::a]:443", id),
568 auth_key: None,
569 first_salt: 0,
570 time_offset: 0,
571 flags: DcFlags::IPV6,
572 }
573 }
574
575 #[test]
576 fn dc_entry_from_parts_ipv4() {
577 let dc = DcEntry::from_parts(1, "149.154.175.53", 443, DcFlags::NONE);
578 assert_eq!(dc.addr, "149.154.175.53:443");
579 assert!(!dc.is_ipv6());
580 let sa = dc.socket_addr().unwrap();
581 assert_eq!(sa.port(), 443);
582 }
583
584 #[test]
585 fn dc_entry_from_parts_ipv6() {
586 let dc = DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6);
587 assert_eq!(dc.addr, "[2001:b28:f23d:f001::a]:443");
588 assert!(dc.is_ipv6());
589 let sa = dc.socket_addr().unwrap();
590 assert_eq!(sa.port(), 443);
591 }
592
593 #[test]
594 fn persisted_session_dc_for_prefers_ipv6() {
595 let mut s = PersistedSession::default();
596 s.dcs.push(make_dc(2)); s.dcs.push(make_dc_v6(2)); let v6 = s.dc_for(2, true).unwrap();
600 assert!(v6.is_ipv6());
601
602 let v4 = s.dc_for(2, false).unwrap();
603 assert!(!v4.is_ipv6());
604 }
605
606 #[test]
607 fn persisted_session_dc_for_falls_back_when_only_ipv4() {
608 let mut s = PersistedSession::default();
609 s.dcs.push(make_dc(3)); let dc = s.dc_for(3, true).unwrap();
613 assert!(!dc.is_ipv6());
614 }
615
616 #[test]
617 fn persisted_session_all_dcs_for_returns_both() {
618 let mut s = PersistedSession::default();
619 s.dcs.push(make_dc(1));
620 s.dcs.push(make_dc_v6(1));
621 s.dcs.push(make_dc(2));
622
623 assert_eq!(s.all_dcs_for(1).count(), 2);
624 assert_eq!(s.all_dcs_for(2).count(), 1);
625 assert_eq!(s.all_dcs_for(5).count(), 0);
626 }
627
628 #[test]
629 fn inmemory_ipv4_and_ipv6_coexist() {
630 let b = InMemoryBackend::new();
631 b.update_dc(&make_dc(2)).unwrap(); b.update_dc(&make_dc_v6(2)).unwrap(); let s = b.snapshot().unwrap();
635 assert_eq!(s.dcs.iter().filter(|d| d.dc_id == 2).count(), 2);
637 }
638
639 #[test]
640 fn binary_roundtrip_ipv4_and_ipv6() {
641 let mut s = PersistedSession::default();
642 s.home_dc_id = 2;
643 s.dcs.push(make_dc(2));
644 s.dcs.push(make_dc_v6(2));
645
646 let bytes = s.to_bytes();
647 let loaded = PersistedSession::from_bytes(&bytes).unwrap();
648 assert_eq!(loaded.dcs.len(), 2);
649 assert_eq!(loaded.dcs.iter().filter(|d| d.is_ipv6()).count(), 1);
650 assert_eq!(loaded.dcs.iter().filter(|d| !d.is_ipv6()).count(), 1);
651 }
652}
653
654#[cfg(feature = "sqlite-session")]
679pub struct SqliteBackend {
680 conn: std::sync::Mutex<rusqlite::Connection>,
681 label: String,
682}
683
684#[cfg(feature = "sqlite-session")]
685impl SqliteBackend {
686 const SCHEMA: &'static str = "
687 PRAGMA journal_mode = WAL;
688 PRAGMA synchronous = NORMAL;
689
690 CREATE TABLE IF NOT EXISTS meta (
691 key TEXT PRIMARY KEY,
692 value INTEGER NOT NULL DEFAULT 0
693 );
694
695 CREATE TABLE IF NOT EXISTS dcs (
696 dc_id INTEGER NOT NULL,
697 flags INTEGER NOT NULL DEFAULT 0,
698 addr TEXT NOT NULL,
699 auth_key BLOB,
700 first_salt INTEGER NOT NULL DEFAULT 0,
701 time_offset INTEGER NOT NULL DEFAULT 0,
702 PRIMARY KEY (dc_id, flags)
703 );
704
705 CREATE TABLE IF NOT EXISTS update_state (
706 id INTEGER PRIMARY KEY CHECK (id = 1),
707 pts INTEGER NOT NULL DEFAULT 0,
708 qts INTEGER NOT NULL DEFAULT 0,
709 date INTEGER NOT NULL DEFAULT 0,
710 seq INTEGER NOT NULL DEFAULT 0
711 );
712
713 CREATE TABLE IF NOT EXISTS channel_pts (
714 channel_id INTEGER PRIMARY KEY,
715 pts INTEGER NOT NULL
716 );
717
718 CREATE TABLE IF NOT EXISTS peers (
719 id INTEGER PRIMARY KEY,
720 access_hash INTEGER NOT NULL,
721 is_channel INTEGER NOT NULL DEFAULT 0
722 );
723 ";
724
725 pub fn open(path: impl Into<PathBuf>) -> io::Result<Self> {
727 let path = path.into();
728 let label = path.display().to_string();
729 let conn = rusqlite::Connection::open(&path)
730 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
731 conn.execute_batch(Self::SCHEMA)
732 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
733 Ok(Self {
734 conn: std::sync::Mutex::new(conn),
735 label,
736 })
737 }
738
739 pub fn in_memory() -> io::Result<Self> {
741 let conn = rusqlite::Connection::open_in_memory()
742 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
743 conn.execute_batch(Self::SCHEMA)
744 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
745 Ok(Self {
746 conn: std::sync::Mutex::new(conn),
747 label: ":memory:".into(),
748 })
749 }
750
751 fn map_err(e: rusqlite::Error) -> io::Error {
752 io::Error::new(io::ErrorKind::Other, e)
753 }
754
755 fn read_session(conn: &rusqlite::Connection) -> io::Result<PersistedSession> {
757 let home_dc_id: i32 = conn
759 .query_row("SELECT value FROM meta WHERE key = 'home_dc_id'", [], |r| {
760 r.get(0)
761 })
762 .unwrap_or(0);
763
764 let mut stmt = conn
766 .prepare("SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs")
767 .map_err(Self::map_err)?;
768 let dcs = stmt
769 .query_map([], |row| {
770 let dc_id: i32 = row.get(0)?;
771 let flags_raw: u8 = row.get(1)?;
772 let addr: String = row.get(2)?;
773 let key_blob: Option<Vec<u8>> = row.get(3)?;
774 let first_salt: i64 = row.get(4)?;
775 let time_offset: i32 = row.get(5)?;
776 Ok((dc_id, addr, key_blob, first_salt, time_offset, flags_raw))
777 })
778 .map_err(Self::map_err)?
779 .filter_map(|r| r.ok())
780 .map(
781 |(dc_id, addr, key_blob, first_salt, time_offset, flags_raw)| {
782 let auth_key = key_blob.and_then(|b| {
783 if b.len() == 256 {
784 let mut k = [0u8; 256];
785 k.copy_from_slice(&b);
786 Some(k)
787 } else {
788 None
789 }
790 });
791 DcEntry {
792 dc_id,
793 addr,
794 auth_key,
795 first_salt,
796 time_offset,
797 flags: DcFlags(flags_raw),
798 }
799 },
800 )
801 .collect();
802
803 let updates_state = conn
805 .query_row(
806 "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
807 [],
808 |r| {
809 Ok(UpdatesStateSnap {
810 pts: r.get(0)?,
811 qts: r.get(1)?,
812 date: r.get(2)?,
813 seq: r.get(3)?,
814 channels: vec![],
815 })
816 },
817 )
818 .unwrap_or_default();
819
820 let mut ch_stmt = conn
822 .prepare("SELECT channel_id, pts FROM channel_pts")
823 .map_err(Self::map_err)?;
824 let channels: Vec<(i64, i32)> = ch_stmt
825 .query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i32>(1)?)))
826 .map_err(Self::map_err)?
827 .filter_map(|r| r.ok())
828 .collect();
829
830 let mut peer_stmt = conn
832 .prepare("SELECT id, access_hash, is_channel FROM peers")
833 .map_err(Self::map_err)?;
834 let peers: Vec<CachedPeer> = peer_stmt
835 .query_map([], |r| {
836 Ok(CachedPeer {
837 id: r.get(0)?,
838 access_hash: r.get(1)?,
839 is_channel: r.get::<_, i32>(2)? != 0,
840 })
841 })
842 .map_err(Self::map_err)?
843 .filter_map(|r| r.ok())
844 .collect();
845
846 Ok(PersistedSession {
847 home_dc_id,
848 dcs,
849 updates_state: UpdatesStateSnap {
850 channels,
851 ..updates_state
852 },
853 peers,
854 })
855 }
856
857 fn write_session(conn: &rusqlite::Connection, s: &PersistedSession) -> io::Result<()> {
859 conn.execute_batch("BEGIN IMMEDIATE")
860 .map_err(Self::map_err)?;
861
862 conn.execute(
863 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
864 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
865 rusqlite::params![s.home_dc_id],
866 )
867 .map_err(Self::map_err)?;
868
869 conn.execute("DELETE FROM dcs", []).map_err(Self::map_err)?;
871 for d in &s.dcs {
872 conn.execute(
873 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
874 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
875 rusqlite::params![
876 d.dc_id,
877 d.flags.0,
878 d.addr,
879 d.auth_key.as_ref().map(|k| k.as_ref()),
880 d.first_salt,
881 d.time_offset,
882 ],
883 )
884 .map_err(Self::map_err)?;
885 }
886
887 let us = &s.updates_state;
889 conn.execute(
890 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1, ?1, ?2, ?3, ?4)
891 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, qts=excluded.qts,
892 date=excluded.date, seq=excluded.seq",
893 rusqlite::params![us.pts, us.qts, us.date, us.seq],
894 )
895 .map_err(Self::map_err)?;
896
897 conn.execute("DELETE FROM channel_pts", [])
898 .map_err(Self::map_err)?;
899 for &(cid, cpts) in &us.channels {
900 conn.execute(
901 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
902 rusqlite::params![cid, cpts],
903 )
904 .map_err(Self::map_err)?;
905 }
906
907 conn.execute("DELETE FROM peers", [])
909 .map_err(Self::map_err)?;
910 for p in &s.peers {
911 conn.execute(
912 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)",
913 rusqlite::params![p.id, p.access_hash, p.is_channel as i32],
914 )
915 .map_err(Self::map_err)?;
916 }
917
918 conn.execute_batch("COMMIT").map_err(Self::map_err)
919 }
920}
921
922#[cfg(feature = "sqlite-session")]
923impl SessionBackend for SqliteBackend {
924 fn save(&self, session: &PersistedSession) -> io::Result<()> {
925 let conn = self.conn.lock().unwrap();
926 Self::write_session(&conn, session)
927 }
928
929 fn load(&self) -> io::Result<Option<PersistedSession>> {
930 let conn = self.conn.lock().unwrap();
931 let count: i64 = conn
933 .query_row("SELECT COUNT(*) FROM meta", [], |r| r.get(0))
934 .map_err(Self::map_err)?;
935 if count == 0 {
936 return Ok(None);
937 }
938 Self::read_session(&conn).map(Some)
939 }
940
941 fn delete(&self) -> io::Result<()> {
942 let conn = self.conn.lock().unwrap();
943 conn.execute_batch(
944 "BEGIN IMMEDIATE;
945 DELETE FROM meta;
946 DELETE FROM dcs;
947 DELETE FROM update_state;
948 DELETE FROM channel_pts;
949 DELETE FROM peers;
950 COMMIT;",
951 )
952 .map_err(Self::map_err)
953 }
954
955 fn name(&self) -> &str {
956 &self.label
957 }
958
959 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
962 let conn = self.conn.lock().unwrap();
963 conn.execute(
964 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
965 VALUES (?1, ?6, ?2, ?3, ?4, ?5)
966 ON CONFLICT(dc_id, flags) DO UPDATE SET
967 addr = excluded.addr,
968 auth_key = excluded.auth_key,
969 first_salt = excluded.first_salt,
970 time_offset = excluded.time_offset",
971 rusqlite::params![
972 entry.dc_id,
973 entry.addr,
974 entry.auth_key.as_ref().map(|k| k.as_ref()),
975 entry.first_salt,
976 entry.time_offset,
977 entry.flags.0,
978 ],
979 )
980 .map(|_| ())
981 .map_err(Self::map_err)
982 }
983
984 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
985 let conn = self.conn.lock().unwrap();
986 conn.execute(
987 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
988 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
989 rusqlite::params![dc_id],
990 )
991 .map(|_| ())
992 .map_err(Self::map_err)
993 }
994
995 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
996 let conn = self.conn.lock().unwrap();
997 match update {
998 UpdateStateChange::All(snap) => {
999 conn.execute(
1000 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1001 ON CONFLICT(id) DO UPDATE SET
1002 pts=excluded.pts, qts=excluded.qts,
1003 date=excluded.date, seq=excluded.seq",
1004 rusqlite::params![snap.pts, snap.qts, snap.date, snap.seq],
1005 )
1006 .map_err(Self::map_err)?;
1007 conn.execute("DELETE FROM channel_pts", [])
1008 .map_err(Self::map_err)?;
1009 for &(cid, cpts) in &snap.channels {
1010 conn.execute(
1011 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
1012 rusqlite::params![cid, cpts],
1013 )
1014 .map_err(Self::map_err)?;
1015 }
1016 Ok(())
1017 }
1018 UpdateStateChange::Primary { pts, date, seq } => conn
1019 .execute(
1020 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,0,?2,?3)
1021 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, date=excluded.date,
1022 seq=excluded.seq",
1023 rusqlite::params![pts, date, seq],
1024 )
1025 .map(|_| ())
1026 .map_err(Self::map_err),
1027 UpdateStateChange::Secondary { qts } => conn
1028 .execute(
1029 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,0,?1,0,0)
1030 ON CONFLICT(id) DO UPDATE SET qts = excluded.qts",
1031 rusqlite::params![qts],
1032 )
1033 .map(|_| ())
1034 .map_err(Self::map_err),
1035 UpdateStateChange::Channel { id, pts } => conn
1036 .execute(
1037 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)
1038 ON CONFLICT(channel_id) DO UPDATE SET pts = excluded.pts",
1039 rusqlite::params![id, pts],
1040 )
1041 .map(|_| ())
1042 .map_err(Self::map_err),
1043 }
1044 }
1045
1046 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1047 let conn = self.conn.lock().unwrap();
1048 conn.execute(
1049 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)
1050 ON CONFLICT(id) DO UPDATE SET
1051 access_hash = excluded.access_hash,
1052 is_channel = excluded.is_channel",
1053 rusqlite::params![peer.id, peer.access_hash, peer.is_channel as i32],
1054 )
1055 .map(|_| ())
1056 .map_err(Self::map_err)
1057 }
1058}
1059
1060#[cfg(feature = "libsql-session")]
1080pub struct LibSqlBackend {
1081 conn: libsql::Connection,
1082 label: String,
1083}
1084
1085#[cfg(feature = "libsql-session")]
1086impl LibSqlBackend {
1087 const SCHEMA: &'static str = "
1088 CREATE TABLE IF NOT EXISTS meta (
1089 key TEXT PRIMARY KEY,
1090 value INTEGER NOT NULL DEFAULT 0
1091 );
1092 CREATE TABLE IF NOT EXISTS dcs (
1093 dc_id INTEGER NOT NULL,
1094 flags INTEGER NOT NULL DEFAULT 0,
1095 addr TEXT NOT NULL,
1096 auth_key BLOB,
1097 first_salt INTEGER NOT NULL DEFAULT 0,
1098 time_offset INTEGER NOT NULL DEFAULT 0,
1099 PRIMARY KEY (dc_id, flags)
1100 );
1101 CREATE TABLE IF NOT EXISTS update_state (
1102 id INTEGER PRIMARY KEY CHECK (id = 1),
1103 pts INTEGER NOT NULL DEFAULT 0,
1104 qts INTEGER NOT NULL DEFAULT 0,
1105 date INTEGER NOT NULL DEFAULT 0,
1106 seq INTEGER NOT NULL DEFAULT 0
1107 );
1108 CREATE TABLE IF NOT EXISTS channel_pts (
1109 channel_id INTEGER PRIMARY KEY,
1110 pts INTEGER NOT NULL
1111 );
1112 CREATE TABLE IF NOT EXISTS peers (
1113 id INTEGER PRIMARY KEY,
1114 access_hash INTEGER NOT NULL,
1115 is_channel INTEGER NOT NULL DEFAULT 0
1116 );
1117 ";
1118
1119 fn block<F, T>(fut: F) -> io::Result<T>
1120 where
1121 F: std::future::Future<Output = Result<T, libsql::Error>>,
1122 {
1123 tokio::runtime::Handle::current()
1124 .block_on(fut)
1125 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
1126 }
1127
1128 async fn apply_schema(conn: &libsql::Connection) -> Result<(), libsql::Error> {
1129 conn.execute_batch(Self::SCHEMA).await
1130 }
1131
1132 pub fn open_local(path: impl Into<PathBuf>) -> io::Result<Self> {
1134 let path = path.into();
1135 let label = path.display().to_string();
1136 let db = Self::block(async { libsql::Builder::new_local(path).build().await })?;
1137 let conn = Self::block(async { db.connect() })
1138 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1139 Self::block(Self::apply_schema(&conn))?;
1140 Ok(Self {
1141 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1142 label,
1143 })
1144 }
1145
1146 pub fn in_memory() -> io::Result<Self> {
1148 let db = Self::block(async { libsql::Builder::new_local(":memory:").build().await })?;
1149 let conn = Self::block(async { db.connect() })
1150 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1151 Self::block(Self::apply_schema(&conn))?;
1152 Ok(Self {
1153 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1154 label: ":memory:".into(),
1155 })
1156 }
1157
1158 pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> io::Result<Self> {
1160 let url = url.into();
1161 let label = url.clone();
1162 let db = Self::block(async {
1163 libsql::Builder::new_remote(url, auth_token.into())
1164 .build()
1165 .await
1166 })?;
1167 let conn = Self::block(async { db.connect() })
1168 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1169 Self::block(Self::apply_schema(&conn))?;
1170 Ok(Self {
1171 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1172 label,
1173 })
1174 }
1175
1176 pub fn open_replica(
1178 path: impl Into<PathBuf>,
1179 url: impl Into<String>,
1180 auth_token: impl Into<String>,
1181 ) -> io::Result<Self> {
1182 let path = path.into();
1183 let label = format!("{} (replica of {})", path.display(), url.into());
1184 let db = Self::block(async {
1185 libsql::Builder::new_remote_replica(path, url.into(), auth_token.into())
1186 .build()
1187 .await
1188 })?;
1189 let conn = Self::block(async { db.connect() })
1190 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1191 Self::block(Self::apply_schema(&conn))?;
1192 Ok(Self {
1193 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1194 label,
1195 })
1196 }
1197
1198 async fn read_session_async(
1199 conn: &libsql::Connection,
1200 ) -> Result<PersistedSession, libsql::Error> {
1201 use libsql::de;
1202
1203 let home_dc_id: i32 = conn
1205 .query("SELECT value FROM meta WHERE key = 'home_dc_id'", ())
1206 .await?
1207 .next()
1208 .await?
1209 .map(|r| r.get::<i32>(0))
1210 .transpose()?
1211 .unwrap_or(0);
1212
1213 let mut rows = conn
1215 .query(
1216 "SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs",
1217 (),
1218 )
1219 .await?;
1220 let mut dcs = Vec::new();
1221 while let Some(row) = rows.next().await? {
1222 let dc_id: i32 = row.get(0)?;
1223 let flags_raw: u8 = row.get::<i64>(1)? as u8;
1224 let addr: String = row.get(2)?;
1225 let key_blob: Option<Vec<u8>> = row.get(3)?;
1226 let first_salt: i64 = row.get(4)?;
1227 let time_offset: i32 = row.get(5)?;
1228 let auth_key = match key_blob {
1229 Some(b) if b.len() == 256 => {
1230 let mut k = [0u8; 256];
1231 k.copy_from_slice(&b);
1232 Some(k)
1233 }
1234 Some(b) => {
1235 return Err(libsql::Error::Misuse(format!(
1236 "auth_key blob must be 256 bytes, got {}",
1237 b.len()
1238 )));
1239 }
1240 None => None,
1241 };
1242 dcs.push(DcEntry {
1243 dc_id,
1244 addr,
1245 auth_key,
1246 first_salt,
1247 time_offset,
1248 flags: DcFlags(flags_raw),
1249 });
1250 }
1251
1252 let mut us_row = conn
1254 .query(
1255 "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1256 (),
1257 )
1258 .await?;
1259 let updates_state = if let Some(r) = us_row.next().await? {
1260 UpdatesStateSnap {
1261 pts: r.get(0)?,
1262 qts: r.get(1)?,
1263 date: r.get(2)?,
1264 seq: r.get(3)?,
1265 channels: vec![],
1266 }
1267 } else {
1268 UpdatesStateSnap::default()
1269 };
1270
1271 let mut ch_rows = conn
1273 .query("SELECT channel_id, pts FROM channel_pts", ())
1274 .await?;
1275 let mut channels = Vec::new();
1276 while let Some(r) = ch_rows.next().await? {
1277 channels.push((r.get::<i64>(0)?, r.get::<i32>(1)?));
1278 }
1279
1280 let mut peer_rows = conn
1282 .query("SELECT id, access_hash, is_channel FROM peers", ())
1283 .await?;
1284 let mut peers = Vec::new();
1285 while let Some(r) = peer_rows.next().await? {
1286 peers.push(CachedPeer {
1287 id: r.get(0)?,
1288 access_hash: r.get(1)?,
1289 is_channel: r.get::<i32>(2)? != 0,
1290 });
1291 }
1292
1293 Ok(PersistedSession {
1294 home_dc_id,
1295 dcs,
1296 updates_state: UpdatesStateSnap {
1297 channels,
1298 ..updates_state
1299 },
1300 peers,
1301 })
1302 }
1303
1304 async fn write_session_async(
1305 conn: &libsql::Connection,
1306 s: &PersistedSession,
1307 ) -> Result<(), libsql::Error> {
1308 conn.execute_batch("BEGIN IMMEDIATE").await?;
1309
1310 conn.execute(
1311 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1312 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1313 libsql::params![s.home_dc_id],
1314 )
1315 .await?;
1316
1317 conn.execute("DELETE FROM dcs", ()).await?;
1318 for d in &s.dcs {
1319 conn.execute(
1320 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1321 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1322 libsql::params![
1323 d.dc_id,
1324 d.flags.0 as i64,
1325 d.addr.clone(),
1326 d.auth_key.map(|k| k.to_vec()),
1327 d.first_salt,
1328 d.time_offset,
1329 ],
1330 )
1331 .await?;
1332 }
1333
1334 let us = &s.updates_state;
1335 conn.execute(
1336 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1337 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1338 date=excluded.date,seq=excluded.seq",
1339 libsql::params![us.pts, us.qts, us.date, us.seq],
1340 )
1341 .await?;
1342
1343 conn.execute("DELETE FROM channel_pts", ()).await?;
1344 for &(cid, cpts) in &us.channels {
1345 conn.execute(
1346 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1,?2)",
1347 libsql::params![cid, cpts],
1348 )
1349 .await?;
1350 }
1351
1352 conn.execute("DELETE FROM peers", ()).await?;
1353 for p in &s.peers {
1354 conn.execute(
1355 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1,?2,?3)",
1356 libsql::params![p.id, p.access_hash, p.is_channel as i32],
1357 )
1358 .await?;
1359 }
1360
1361 conn.execute_batch("COMMIT").await
1362 }
1363}
1364
1365#[cfg(feature = "libsql-session")]
1366impl SessionBackend for LibSqlBackend {
1367 fn save(&self, session: &PersistedSession) -> io::Result<()> {
1368 let conn = self.conn.clone();
1369 let session = session.clone();
1370 Self::block(async move {
1371 let conn = conn.lock().await;
1372 Self::write_session_async(&conn, session).await
1373 })
1374 }
1375
1376 fn load(&self) -> io::Result<Option<PersistedSession>> {
1377 let conn = self.conn.clone();
1378 let count: i64 = Self::block(async move {
1379 let conn = conn.lock().await;
1380 let mut rows = conn.query("SELECT COUNT(*) FROM meta", ()).await?;
1381 Ok::<i64, libsql::Error>(rows.next().await?.and_then(|r| r.get(0).ok()).unwrap_or(0))
1382 })?;
1383 if count == 0 {
1384 return Ok(None);
1385 }
1386 let conn = self.conn.clone();
1387 Self::block(async move {
1388 let conn = conn.lock().await;
1389 Self::read_session_async(&conn).await
1390 })
1391 .map(Some)
1392 }
1393
1394 fn delete(&self) -> io::Result<()> {
1395 let conn = self.conn.clone();
1396 Self::block(async move {
1397 let conn = conn.lock().await;
1398 conn.execute_batch(
1399 "BEGIN IMMEDIATE;
1400 DELETE FROM meta;
1401 DELETE FROM dcs;
1402 DELETE FROM update_state;
1403 DELETE FROM channel_pts;
1404 DELETE FROM peers;
1405 COMMIT;",
1406 )
1407 .await
1408 })
1409 }
1410
1411 fn name(&self) -> &str {
1412 &self.label
1413 }
1414
1415 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
1418 let conn = self.conn.clone();
1419 let (dc_id, addr, key, salt, off, flags) = (
1420 entry.dc_id,
1421 entry.addr.clone(),
1422 entry.auth_key.map(|k| k.to_vec()),
1423 entry.first_salt,
1424 entry.time_offset,
1425 entry.flags.0 as i64,
1426 );
1427 Self::block(async move {
1428 let conn = conn.lock().await;
1429 conn.execute(
1430 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1431 VALUES (?1,?6,?2,?3,?4,?5)
1432 ON CONFLICT(dc_id, flags) DO UPDATE SET
1433 addr=excluded.addr, auth_key=excluded.auth_key,
1434 first_salt=excluded.first_salt, time_offset=excluded.time_offset",
1435 libsql::params![dc_id, addr, key, salt, off, flags],
1436 )
1437 .await
1438 .map(|_| ())
1439 })
1440 }
1441
1442 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
1443 let conn = self.conn.clone();
1444 Self::block(async move {
1445 let conn = conn.lock().await;
1446 conn.execute(
1447 "INSERT INTO meta (key, value) VALUES ('home_dc_id',?1)
1448 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
1449 libsql::params![dc_id],
1450 )
1451 .await
1452 .map(|_| ())
1453 })
1454 }
1455
1456 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
1457 let conn = self.conn.clone();
1458 Self::block(async move {
1459 let conn = conn.lock().await;
1460 match update {
1461 UpdateStateChange::All(snap) => {
1462 conn.execute(
1463 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,?2,?3,?4)
1464 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1465 date=excluded.date,seq=excluded.seq",
1466 libsql::params![snap.pts, snap.qts, snap.date, snap.seq],
1467 )
1468 .await?;
1469 conn.execute("DELETE FROM channel_pts", ()).await?;
1470 for &(cid, cpts) in &snap.channels {
1471 conn.execute(
1472 "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)",
1473 libsql::params![cid, cpts],
1474 )
1475 .await?;
1476 }
1477 Ok(())
1478 }
1479 UpdateStateChange::Primary { pts, date, seq } => conn
1480 .execute(
1481 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,0,?2,?3)
1482 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,date=excluded.date,
1483 seq=excluded.seq",
1484 libsql::params![pts, date, seq],
1485 )
1486 .await
1487 .map(|_| ()),
1488 UpdateStateChange::Secondary { qts } => conn
1489 .execute(
1490 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,0,?1,0,0)
1491 ON CONFLICT(id) DO UPDATE SET qts=excluded.qts",
1492 libsql::params![qts],
1493 )
1494 .await
1495 .map(|_| ()),
1496 UpdateStateChange::Channel { id, pts } => conn
1497 .execute(
1498 "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)
1499 ON CONFLICT(channel_id) DO UPDATE SET pts=excluded.pts",
1500 libsql::params![id, pts],
1501 )
1502 .await
1503 .map(|_| ()),
1504 }
1505 })
1506 }
1507
1508 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1509 let conn = self.conn.clone();
1510 let (id, hash, is_ch) = (peer.id, peer.access_hash, peer.is_channel as i32);
1511 Self::block(async move {
1512 let conn = conn.lock().await;
1513 conn.execute(
1514 "INSERT INTO peers (id,access_hash,is_channel) VALUES (?1,?2,?3)
1515 ON CONFLICT(id) DO UPDATE SET
1516 access_hash=excluded.access_hash,
1517 is_channel=excluded.is_channel",
1518 libsql::params![id, hash, is_ch],
1519 )
1520 .await
1521 .map(|_| ())
1522 })
1523 }
1524}