1use std::io;
34use std::path::PathBuf;
35
36use crate::session::{CachedPeer, DcEntry, PersistedSession, UpdatesStateSnap};
37
38pub trait SessionBackend: Send + Sync {
46 fn save(&self, session: &PersistedSession) -> io::Result<()>;
47 fn load(&self) -> io::Result<Option<PersistedSession>>;
48 fn delete(&self) -> io::Result<()>;
49
50 fn name(&self) -> &str;
52
53 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
67 let mut s = self.load()?.unwrap_or_default();
68 if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
70 *existing = entry.clone();
71 } else {
72 s.dcs.push(entry.clone());
73 }
74 self.save(&s)
75 }
76
77 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
84 let mut s = self.load()?.unwrap_or_default();
85 s.home_dc_id = dc_id;
86 self.save(&s)
87 }
88
89 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
95 let mut s = self.load()?.unwrap_or_default();
96 update.apply_to(&mut s.updates_state);
97 self.save(&s)
98 }
99
100 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
107 let mut s = self.load()?.unwrap_or_default();
108 if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
109 *existing = peer.clone();
110 } else {
111 s.peers.push(peer.clone());
112 }
113 self.save(&s)
114 }
115}
116
117#[derive(Debug, Clone)]
131pub enum UpdateStateChange {
132 All(UpdatesStateSnap),
134 Primary { pts: i32, date: i32, seq: i32 },
136 Secondary { qts: i32 },
138 Channel { id: i64, pts: i32 },
140}
141
142impl UpdateStateChange {
143 pub fn apply_to(&self, snap: &mut UpdatesStateSnap) {
145 match self {
146 Self::All(new_snap) => *snap = new_snap.clone(),
147 Self::Primary { pts, date, seq } => {
148 snap.pts = *pts;
149 snap.date = *date;
150 snap.seq = *seq;
151 }
152 Self::Secondary { qts } => {
153 snap.qts = *qts;
154 }
155 Self::Channel { id, pts } => {
156 if let Some(existing) = snap.channels.iter_mut().find(|c| c.0 == *id) {
158 existing.1 = *pts;
159 } else {
160 snap.channels.push((*id, *pts));
161 }
162 }
163 }
164 }
165}
166
167pub struct BinaryFileBackend {
171 path: PathBuf,
172}
173
174impl BinaryFileBackend {
175 pub fn new(path: impl Into<PathBuf>) -> Self {
176 Self { path: path.into() }
177 }
178
179 pub fn path(&self) -> &std::path::Path {
180 &self.path
181 }
182}
183
184impl SessionBackend for BinaryFileBackend {
185 fn save(&self, session: &PersistedSession) -> io::Result<()> {
186 session.save(&self.path)
187 }
188
189 fn load(&self) -> io::Result<Option<PersistedSession>> {
190 if !self.path.exists() {
191 return Ok(None);
192 }
193 match PersistedSession::load(&self.path) {
194 Ok(s) => Ok(Some(s)),
195 Err(e) => {
196 let bak = self.path.with_extension("bak");
197 tracing::warn!(
198 "[layer] Session file {:?} is corrupt ({e}); \
199 renaming to {:?} and starting fresh",
200 self.path,
201 bak
202 );
203 let _ = std::fs::rename(&self.path, &bak);
204 Ok(None)
205 }
206 }
207 }
208
209 fn delete(&self) -> io::Result<()> {
210 if self.path.exists() {
211 std::fs::remove_file(&self.path)?;
212 }
213 Ok(())
214 }
215
216 fn name(&self) -> &str {
217 "binary-file"
218 }
219
220 }
223
224#[derive(Default)]
232pub struct InMemoryBackend {
233 data: std::sync::Mutex<Option<PersistedSession>>,
234}
235
236impl InMemoryBackend {
237 pub fn new() -> Self {
238 Self::default()
239 }
240
241 pub fn snapshot(&self) -> Option<PersistedSession> {
243 self.data.lock().unwrap().clone()
244 }
245}
246
247impl SessionBackend for InMemoryBackend {
248 fn save(&self, s: &PersistedSession) -> io::Result<()> {
249 *self.data.lock().unwrap() = Some(s.clone());
250 Ok(())
251 }
252
253 fn load(&self) -> io::Result<Option<PersistedSession>> {
254 Ok(self.data.lock().unwrap().clone())
255 }
256
257 fn delete(&self) -> io::Result<()> {
258 *self.data.lock().unwrap() = None;
259 Ok(())
260 }
261
262 fn name(&self) -> &str {
263 "in-memory"
264 }
265
266 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
269 let mut guard = self.data.lock().unwrap();
270 let s = guard.get_or_insert_with(PersistedSession::default);
271 if let Some(existing) = s.dcs.iter_mut().find(|d| d.dc_id == entry.dc_id) {
272 *existing = entry.clone();
273 } else {
274 s.dcs.push(entry.clone());
275 }
276 Ok(())
277 }
278
279 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
280 let mut guard = self.data.lock().unwrap();
281 let s = guard.get_or_insert_with(PersistedSession::default);
282 s.home_dc_id = dc_id;
283 Ok(())
284 }
285
286 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
287 let mut guard = self.data.lock().unwrap();
288 let s = guard.get_or_insert_with(PersistedSession::default);
289 update.apply_to(&mut s.updates_state);
290 Ok(())
291 }
292
293 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
294 let mut guard = self.data.lock().unwrap();
295 let s = guard.get_or_insert_with(PersistedSession::default);
296 if let Some(existing) = s.peers.iter_mut().find(|p| p.id == peer.id) {
297 *existing = peer.clone();
298 } else {
299 s.peers.push(peer.clone());
300 }
301 Ok(())
302 }
303}
304
305pub struct StringSessionBackend {
309 data: std::sync::Mutex<String>,
310}
311
312impl StringSessionBackend {
313 pub fn new(s: impl Into<String>) -> Self {
314 Self {
315 data: std::sync::Mutex::new(s.into()),
316 }
317 }
318
319 pub fn current(&self) -> String {
320 self.data.lock().unwrap().clone()
321 }
322}
323
324impl SessionBackend for StringSessionBackend {
325 fn save(&self, session: &PersistedSession) -> io::Result<()> {
326 *self.data.lock().unwrap() = session.to_string();
327 Ok(())
328 }
329
330 fn load(&self) -> io::Result<Option<PersistedSession>> {
331 let s = self.data.lock().unwrap().clone();
332 if s.trim().is_empty() {
333 return Ok(None);
334 }
335 PersistedSession::from_string(&s).map(Some)
336 }
337
338 fn delete(&self) -> io::Result<()> {
339 *self.data.lock().unwrap() = String::new();
340 Ok(())
341 }
342
343 fn name(&self) -> &str {
344 "string-session"
345 }
346}
347
348#[cfg(test)]
351mod tests {
352 use super::*;
353
354 fn make_dc(id: i32) -> DcEntry {
355 DcEntry {
356 dc_id: id,
357 addr: format!("1.2.3.{id}:443"),
358 auth_key: None,
359 first_salt: 0,
360 time_offset: 0,
361 flags: DcFlags::NONE,
362 }
363 }
364
365 fn make_peer(id: i64, hash: i64) -> CachedPeer {
366 CachedPeer {
367 id,
368 access_hash: hash,
369 is_channel: false,
370 }
371 }
372
373 #[test]
376 fn inmemory_load_returns_none_when_empty() {
377 let b = InMemoryBackend::new();
378 assert!(b.load().unwrap().is_none());
379 }
380
381 #[test]
382 fn inmemory_save_then_load_round_trips() {
383 let b = InMemoryBackend::new();
384 let mut s = PersistedSession::default();
385 s.home_dc_id = 3;
386 s.dcs.push(make_dc(3));
387 b.save(&s).unwrap();
388
389 let loaded = b.load().unwrap().unwrap();
390 assert_eq!(loaded.home_dc_id, 3);
391 assert_eq!(loaded.dcs.len(), 1);
392 }
393
394 #[test]
395 fn inmemory_delete_clears_state() {
396 let b = InMemoryBackend::new();
397 let mut s = PersistedSession::default();
398 s.home_dc_id = 2;
399 b.save(&s).unwrap();
400 b.delete().unwrap();
401 assert!(b.load().unwrap().is_none());
402 }
403
404 #[test]
407 fn inmemory_update_dc_inserts_new() {
408 let b = InMemoryBackend::new();
409 b.update_dc(&make_dc(4)).unwrap();
410 let s = b.snapshot().unwrap();
411 assert_eq!(s.dcs.len(), 1);
412 assert_eq!(s.dcs[0].dc_id, 4);
413 }
414
415 #[test]
416 fn inmemory_update_dc_replaces_existing() {
417 let b = InMemoryBackend::new();
418 b.update_dc(&make_dc(2)).unwrap();
419 let mut updated = make_dc(2);
420 updated.addr = "9.9.9.9:443".to_string();
421 b.update_dc(&updated).unwrap();
422
423 let s = b.snapshot().unwrap();
424 assert_eq!(s.dcs.len(), 1);
425 assert_eq!(s.dcs[0].addr, "9.9.9.9:443");
426 }
427
428 #[test]
429 fn inmemory_set_home_dc() {
430 let b = InMemoryBackend::new();
431 b.set_home_dc(5).unwrap();
432 assert_eq!(b.snapshot().unwrap().home_dc_id, 5);
433 }
434
435 #[test]
436 fn inmemory_cache_peer_inserts() {
437 let b = InMemoryBackend::new();
438 b.cache_peer(&make_peer(100, 0xdeadbeef)).unwrap();
439 let s = b.snapshot().unwrap();
440 assert_eq!(s.peers.len(), 1);
441 assert_eq!(s.peers[0].id, 100);
442 }
443
444 #[test]
445 fn inmemory_cache_peer_updates_existing() {
446 let b = InMemoryBackend::new();
447 b.cache_peer(&make_peer(100, 111)).unwrap();
448 b.cache_peer(&make_peer(100, 222)).unwrap();
449 let s = b.snapshot().unwrap();
450 assert_eq!(s.peers.len(), 1);
451 assert_eq!(s.peers[0].access_hash, 222);
452 }
453
454 #[test]
457 fn update_state_primary() {
458 let mut snap = UpdatesStateSnap {
459 pts: 0,
460 qts: 0,
461 date: 0,
462 seq: 0,
463 channels: vec![],
464 };
465 UpdateStateChange::Primary {
466 pts: 10,
467 date: 20,
468 seq: 30,
469 }
470 .apply_to(&mut snap);
471 assert_eq!(snap.pts, 10);
472 assert_eq!(snap.date, 20);
473 assert_eq!(snap.seq, 30);
474 assert_eq!(snap.qts, 0); }
476
477 #[test]
478 fn update_state_secondary() {
479 let mut snap = UpdatesStateSnap {
480 pts: 5,
481 qts: 0,
482 date: 0,
483 seq: 0,
484 channels: vec![],
485 };
486 UpdateStateChange::Secondary { qts: 99 }.apply_to(&mut snap);
487 assert_eq!(snap.qts, 99);
488 assert_eq!(snap.pts, 5); }
490
491 #[test]
492 fn update_state_channel_inserts() {
493 let mut snap = UpdatesStateSnap {
494 pts: 0,
495 qts: 0,
496 date: 0,
497 seq: 0,
498 channels: vec![],
499 };
500 UpdateStateChange::Channel { id: 12345, pts: 42 }.apply_to(&mut snap);
501 assert_eq!(snap.channels, vec![(12345, 42)]);
502 }
503
504 #[test]
505 fn update_state_channel_updates_existing() {
506 let mut snap = UpdatesStateSnap {
507 pts: 0,
508 qts: 0,
509 date: 0,
510 seq: 0,
511 channels: vec![(12345, 10), (67890, 5)],
512 };
513 UpdateStateChange::Channel { id: 12345, pts: 99 }.apply_to(&mut snap);
514 assert_eq!(snap.channels[0], (12345, 99));
516 assert_eq!(snap.channels[1], (67890, 5));
517 }
518
519 #[test]
520 fn apply_update_state_via_backend() {
521 let b = InMemoryBackend::new();
522 b.apply_update_state(UpdateStateChange::Primary {
523 pts: 7,
524 date: 8,
525 seq: 9,
526 })
527 .unwrap();
528 let s = b.snapshot().unwrap();
529 assert_eq!(s.updates_state.pts, 7);
530 }
531
532 #[test]
535 fn default_update_dc_via_trait_object() {
536 let b: Box<dyn SessionBackend> = Box::new(InMemoryBackend::new());
537 b.update_dc(&make_dc(1)).unwrap();
538 b.update_dc(&make_dc(2)).unwrap();
539 let loaded = b.load().unwrap().unwrap();
541 assert_eq!(loaded.dcs.len(), 2);
542 }
543
544 fn make_dc_v6(id: i32) -> DcEntry {
547 DcEntry {
548 dc_id: id,
549 addr: format!("[2001:b28:f23d:f00{}::a]:443", id),
550 auth_key: None,
551 first_salt: 0,
552 time_offset: 0,
553 flags: DcFlags::IPV6,
554 }
555 }
556
557 #[test]
558 fn dc_entry_from_parts_ipv4() {
559 let dc = DcEntry::from_parts(1, "149.154.175.53", 443, DcFlags::NONE);
560 assert_eq!(dc.addr, "149.154.175.53:443");
561 assert!(!dc.is_ipv6());
562 let sa = dc.socket_addr().unwrap();
563 assert_eq!(sa.port(), 443);
564 }
565
566 #[test]
567 fn dc_entry_from_parts_ipv6() {
568 let dc = DcEntry::from_parts(2, "2001:b28:f23d:f001::a", 443, DcFlags::IPV6);
569 assert_eq!(dc.addr, "[2001:b28:f23d:f001::a]:443");
570 assert!(dc.is_ipv6());
571 let sa = dc.socket_addr().unwrap();
572 assert_eq!(sa.port(), 443);
573 }
574
575 #[test]
576 fn persisted_session_dc_for_prefers_ipv6() {
577 let mut s = PersistedSession::default();
578 s.dcs.push(make_dc(2)); s.dcs.push(make_dc_v6(2)); let v6 = s.dc_for(2, true).unwrap();
582 assert!(v6.is_ipv6());
583
584 let v4 = s.dc_for(2, false).unwrap();
585 assert!(!v4.is_ipv6());
586 }
587
588 #[test]
589 fn persisted_session_dc_for_falls_back_when_only_ipv4() {
590 let mut s = PersistedSession::default();
591 s.dcs.push(make_dc(3)); let dc = s.dc_for(3, true).unwrap();
595 assert!(!dc.is_ipv6());
596 }
597
598 #[test]
599 fn persisted_session_all_dcs_for_returns_both() {
600 let mut s = PersistedSession::default();
601 s.dcs.push(make_dc(1));
602 s.dcs.push(make_dc_v6(1));
603 s.dcs.push(make_dc(2));
604
605 assert_eq!(s.all_dcs_for(1).count(), 2);
606 assert_eq!(s.all_dcs_for(2).count(), 1);
607 assert_eq!(s.all_dcs_for(5).count(), 0);
608 }
609
610 #[test]
611 fn inmemory_ipv4_and_ipv6_coexist() {
612 let b = InMemoryBackend::new();
613 b.update_dc(&make_dc(2)).unwrap(); b.update_dc(&make_dc_v6(2)).unwrap(); let s = b.snapshot().unwrap();
617 assert_eq!(s.dcs.iter().filter(|d| d.dc_id == 2).count(), 2);
619 }
620
621 #[test]
622 fn binary_roundtrip_ipv4_and_ipv6() {
623 let mut s = PersistedSession::default();
624 s.home_dc_id = 2;
625 s.dcs.push(make_dc(2));
626 s.dcs.push(make_dc_v6(2));
627
628 let bytes = s.to_bytes();
629 let loaded = PersistedSession::from_bytes(&bytes).unwrap();
630 assert_eq!(loaded.dcs.len(), 2);
631 assert_eq!(loaded.dcs.iter().filter(|d| d.is_ipv6()).count(), 1);
632 assert_eq!(loaded.dcs.iter().filter(|d| !d.is_ipv6()).count(), 1);
633 }
634}
635
636#[cfg(feature = "sqlite-session")]
661pub struct SqliteBackend {
662 conn: std::sync::Mutex<rusqlite::Connection>,
663 label: String,
664}
665
666#[cfg(feature = "sqlite-session")]
667impl SqliteBackend {
668 const SCHEMA: &'static str = "
669 PRAGMA journal_mode = WAL;
670 PRAGMA synchronous = NORMAL;
671
672 CREATE TABLE IF NOT EXISTS meta (
673 key TEXT PRIMARY KEY,
674 value INTEGER NOT NULL DEFAULT 0
675 );
676
677 CREATE TABLE IF NOT EXISTS dcs (
678 dc_id INTEGER NOT NULL,
679 flags INTEGER NOT NULL DEFAULT 0,
680 addr TEXT NOT NULL,
681 auth_key BLOB,
682 first_salt INTEGER NOT NULL DEFAULT 0,
683 time_offset INTEGER NOT NULL DEFAULT 0,
684 PRIMARY KEY (dc_id, flags)
685 );
686
687 CREATE TABLE IF NOT EXISTS update_state (
688 id INTEGER PRIMARY KEY CHECK (id = 1),
689 pts INTEGER NOT NULL DEFAULT 0,
690 qts INTEGER NOT NULL DEFAULT 0,
691 date INTEGER NOT NULL DEFAULT 0,
692 seq INTEGER NOT NULL DEFAULT 0
693 );
694
695 CREATE TABLE IF NOT EXISTS channel_pts (
696 channel_id INTEGER PRIMARY KEY,
697 pts INTEGER NOT NULL
698 );
699
700 CREATE TABLE IF NOT EXISTS peers (
701 id INTEGER PRIMARY KEY,
702 access_hash INTEGER NOT NULL,
703 is_channel INTEGER NOT NULL DEFAULT 0
704 );
705 ";
706
707 pub fn open(path: impl Into<PathBuf>) -> io::Result<Self> {
709 let path = path.into();
710 let label = path.display().to_string();
711 let conn = rusqlite::Connection::open(&path)
712 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
713 conn.execute_batch(Self::SCHEMA)
714 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
715 Ok(Self {
716 conn: std::sync::Mutex::new(conn),
717 label,
718 })
719 }
720
721 pub fn in_memory() -> io::Result<Self> {
723 let conn = rusqlite::Connection::open_in_memory()
724 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
725 conn.execute_batch(Self::SCHEMA)
726 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
727 Ok(Self {
728 conn: std::sync::Mutex::new(conn),
729 label: ":memory:".into(),
730 })
731 }
732
733 fn map_err(e: rusqlite::Error) -> io::Error {
734 io::Error::new(io::ErrorKind::Other, e)
735 }
736
737 fn read_session(conn: &rusqlite::Connection) -> io::Result<PersistedSession> {
739 let home_dc_id: i32 = conn
741 .query_row("SELECT value FROM meta WHERE key = 'home_dc_id'", [], |r| {
742 r.get(0)
743 })
744 .unwrap_or(0);
745
746 let mut stmt = conn
748 .prepare("SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs")
749 .map_err(Self::map_err)?;
750 let dcs = stmt
751 .query_map([], |row| {
752 let dc_id: i32 = row.get(0)?;
753 let flags_raw: u8 = row.get(1)?;
754 let addr: String = row.get(2)?;
755 let key_blob: Option<Vec<u8>> = row.get(3)?;
756 let first_salt: i64 = row.get(4)?;
757 let time_offset: i32 = row.get(5)?;
758 Ok((dc_id, addr, key_blob, first_salt, time_offset, flags_raw))
759 })
760 .map_err(Self::map_err)?
761 .filter_map(|r| r.ok())
762 .map(
763 |(dc_id, addr, key_blob, first_salt, time_offset, flags_raw)| {
764 let auth_key = key_blob.and_then(|b| {
765 if b.len() == 256 {
766 let mut k = [0u8; 256];
767 k.copy_from_slice(&b);
768 Some(k)
769 } else {
770 None
771 }
772 });
773 DcEntry {
774 dc_id,
775 addr,
776 auth_key,
777 first_salt,
778 time_offset,
779 flags: DcFlags(flags_raw),
780 }
781 },
782 )
783 .collect();
784
785 let updates_state = conn
787 .query_row(
788 "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
789 [],
790 |r| {
791 Ok(UpdatesStateSnap {
792 pts: r.get(0)?,
793 qts: r.get(1)?,
794 date: r.get(2)?,
795 seq: r.get(3)?,
796 channels: vec![],
797 })
798 },
799 )
800 .unwrap_or_default();
801
802 let mut ch_stmt = conn
804 .prepare("SELECT channel_id, pts FROM channel_pts")
805 .map_err(Self::map_err)?;
806 let channels: Vec<(i64, i32)> = ch_stmt
807 .query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i32>(1)?)))
808 .map_err(Self::map_err)?
809 .filter_map(|r| r.ok())
810 .collect();
811
812 let mut peer_stmt = conn
814 .prepare("SELECT id, access_hash, is_channel FROM peers")
815 .map_err(Self::map_err)?;
816 let peers: Vec<CachedPeer> = peer_stmt
817 .query_map([], |r| {
818 Ok(CachedPeer {
819 id: r.get(0)?,
820 access_hash: r.get(1)?,
821 is_channel: r.get::<_, i32>(2)? != 0,
822 })
823 })
824 .map_err(Self::map_err)?
825 .filter_map(|r| r.ok())
826 .collect();
827
828 Ok(PersistedSession {
829 home_dc_id,
830 dcs,
831 updates_state: UpdatesStateSnap {
832 channels,
833 ..updates_state
834 },
835 peers,
836 })
837 }
838
839 fn write_session(conn: &rusqlite::Connection, s: &PersistedSession) -> io::Result<()> {
841 conn.execute_batch("BEGIN IMMEDIATE")
842 .map_err(Self::map_err)?;
843
844 conn.execute(
845 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
846 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
847 rusqlite::params![s.home_dc_id],
848 )
849 .map_err(Self::map_err)?;
850
851 conn.execute("DELETE FROM dcs", []).map_err(Self::map_err)?;
853 for d in &s.dcs {
854 conn.execute(
855 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
856 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
857 rusqlite::params![
858 d.dc_id,
859 d.flags.0,
860 d.addr,
861 d.auth_key.as_ref().map(|k| k.as_ref()),
862 d.first_salt,
863 d.time_offset,
864 ],
865 )
866 .map_err(Self::map_err)?;
867 }
868
869 let us = &s.updates_state;
871 conn.execute(
872 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1, ?1, ?2, ?3, ?4)
873 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, qts=excluded.qts,
874 date=excluded.date, seq=excluded.seq",
875 rusqlite::params![us.pts, us.qts, us.date, us.seq],
876 )
877 .map_err(Self::map_err)?;
878
879 conn.execute("DELETE FROM channel_pts", [])
880 .map_err(Self::map_err)?;
881 for &(cid, cpts) in &us.channels {
882 conn.execute(
883 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
884 rusqlite::params![cid, cpts],
885 )
886 .map_err(Self::map_err)?;
887 }
888
889 conn.execute("DELETE FROM peers", [])
891 .map_err(Self::map_err)?;
892 for p in &s.peers {
893 conn.execute(
894 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)",
895 rusqlite::params![p.id, p.access_hash, p.is_channel as i32],
896 )
897 .map_err(Self::map_err)?;
898 }
899
900 conn.execute_batch("COMMIT").map_err(Self::map_err)
901 }
902}
903
904#[cfg(feature = "sqlite-session")]
905impl SessionBackend for SqliteBackend {
906 fn save(&self, session: &PersistedSession) -> io::Result<()> {
907 let conn = self.conn.lock().unwrap();
908 Self::write_session(&conn, session)
909 }
910
911 fn load(&self) -> io::Result<Option<PersistedSession>> {
912 let conn = self.conn.lock().unwrap();
913 let count: i64 = conn
915 .query_row("SELECT COUNT(*) FROM meta", [], |r| r.get(0))
916 .map_err(Self::map_err)?;
917 if count == 0 {
918 return Ok(None);
919 }
920 Self::read_session(&conn).map(Some)
921 }
922
923 fn delete(&self) -> io::Result<()> {
924 let conn = self.conn.lock().unwrap();
925 conn.execute_batch(
926 "BEGIN IMMEDIATE;
927 DELETE FROM meta;
928 DELETE FROM dcs;
929 DELETE FROM update_state;
930 DELETE FROM channel_pts;
931 DELETE FROM peers;
932 COMMIT;",
933 )
934 .map_err(Self::map_err)
935 }
936
937 fn name(&self) -> &str {
938 &self.label
939 }
940
941 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
944 let conn = self.conn.lock().unwrap();
945 conn.execute(
946 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
947 VALUES (?1, ?6, ?2, ?3, ?4, ?5)
948 ON CONFLICT(dc_id, flags) DO UPDATE SET
949 addr = excluded.addr,
950 auth_key = excluded.auth_key,
951 first_salt = excluded.first_salt,
952 time_offset = excluded.time_offset",
953 rusqlite::params![
954 entry.dc_id,
955 entry.addr,
956 entry.auth_key.as_ref().map(|k| k.as_ref()),
957 entry.first_salt,
958 entry.time_offset,
959 entry.flags.0,
960 ],
961 )
962 .map(|_| ())
963 .map_err(Self::map_err)
964 }
965
966 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
967 let conn = self.conn.lock().unwrap();
968 conn.execute(
969 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
970 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
971 rusqlite::params![dc_id],
972 )
973 .map(|_| ())
974 .map_err(Self::map_err)
975 }
976
977 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
978 let conn = self.conn.lock().unwrap();
979 match update {
980 UpdateStateChange::All(snap) => {
981 conn.execute(
982 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
983 ON CONFLICT(id) DO UPDATE SET
984 pts=excluded.pts, qts=excluded.qts,
985 date=excluded.date, seq=excluded.seq",
986 rusqlite::params![snap.pts, snap.qts, snap.date, snap.seq],
987 )
988 .map_err(Self::map_err)?;
989 conn.execute("DELETE FROM channel_pts", [])
990 .map_err(Self::map_err)?;
991 for &(cid, cpts) in &snap.channels {
992 conn.execute(
993 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)",
994 rusqlite::params![cid, cpts],
995 )
996 .map_err(Self::map_err)?;
997 }
998 Ok(())
999 }
1000 UpdateStateChange::Primary { pts, date, seq } => conn
1001 .execute(
1002 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,0,?2,?3)
1003 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts, date=excluded.date,
1004 seq=excluded.seq",
1005 rusqlite::params![pts, date, seq],
1006 )
1007 .map(|_| ())
1008 .map_err(Self::map_err),
1009 UpdateStateChange::Secondary { qts } => conn
1010 .execute(
1011 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,0,?1,0,0)
1012 ON CONFLICT(id) DO UPDATE SET qts = excluded.qts",
1013 rusqlite::params![qts],
1014 )
1015 .map(|_| ())
1016 .map_err(Self::map_err),
1017 UpdateStateChange::Channel { id, pts } => conn
1018 .execute(
1019 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1, ?2)
1020 ON CONFLICT(channel_id) DO UPDATE SET pts = excluded.pts",
1021 rusqlite::params![id, pts],
1022 )
1023 .map(|_| ())
1024 .map_err(Self::map_err),
1025 }
1026 }
1027
1028 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1029 let conn = self.conn.lock().unwrap();
1030 conn.execute(
1031 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1, ?2, ?3)
1032 ON CONFLICT(id) DO UPDATE SET
1033 access_hash = excluded.access_hash,
1034 is_channel = excluded.is_channel",
1035 rusqlite::params![peer.id, peer.access_hash, peer.is_channel as i32],
1036 )
1037 .map(|_| ())
1038 .map_err(Self::map_err)
1039 }
1040}
1041
1042#[cfg(feature = "libsql-session")]
1062pub struct LibSqlBackend {
1063 conn: libsql::Connection,
1064 label: String,
1065}
1066
1067#[cfg(feature = "libsql-session")]
1068impl LibSqlBackend {
1069 const SCHEMA: &'static str = "
1070 CREATE TABLE IF NOT EXISTS meta (
1071 key TEXT PRIMARY KEY,
1072 value INTEGER NOT NULL DEFAULT 0
1073 );
1074 CREATE TABLE IF NOT EXISTS dcs (
1075 dc_id INTEGER NOT NULL,
1076 flags INTEGER NOT NULL DEFAULT 0,
1077 addr TEXT NOT NULL,
1078 auth_key BLOB,
1079 first_salt INTEGER NOT NULL DEFAULT 0,
1080 time_offset INTEGER NOT NULL DEFAULT 0,
1081 PRIMARY KEY (dc_id, flags)
1082 );
1083 CREATE TABLE IF NOT EXISTS update_state (
1084 id INTEGER PRIMARY KEY CHECK (id = 1),
1085 pts INTEGER NOT NULL DEFAULT 0,
1086 qts INTEGER NOT NULL DEFAULT 0,
1087 date INTEGER NOT NULL DEFAULT 0,
1088 seq INTEGER NOT NULL DEFAULT 0
1089 );
1090 CREATE TABLE IF NOT EXISTS channel_pts (
1091 channel_id INTEGER PRIMARY KEY,
1092 pts INTEGER NOT NULL
1093 );
1094 CREATE TABLE IF NOT EXISTS peers (
1095 id INTEGER PRIMARY KEY,
1096 access_hash INTEGER NOT NULL,
1097 is_channel INTEGER NOT NULL DEFAULT 0
1098 );
1099 ";
1100
1101 fn block<F, T>(fut: F) -> io::Result<T>
1102 where
1103 F: std::future::Future<Output = Result<T, libsql::Error>>,
1104 {
1105 tokio::runtime::Handle::current()
1106 .block_on(fut)
1107 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
1108 }
1109
1110 async fn apply_schema(conn: &libsql::Connection) -> Result<(), libsql::Error> {
1111 conn.execute_batch(Self::SCHEMA).await
1112 }
1113
1114 pub fn open_local(path: impl Into<PathBuf>) -> io::Result<Self> {
1116 let path = path.into();
1117 let label = path.display().to_string();
1118 let db = Self::block(async { libsql::Builder::new_local(path).build().await })?;
1119 let conn = Self::block(async { db.connect() })
1120 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1121 Self::block(Self::apply_schema(&conn))?;
1122 Ok(Self {
1123 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1124 label,
1125 })
1126 }
1127
1128 pub fn in_memory() -> io::Result<Self> {
1130 let db = Self::block(async { libsql::Builder::new_local(":memory:").build().await })?;
1131 let conn = Self::block(async { db.connect() })
1132 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1133 Self::block(Self::apply_schema(&conn))?;
1134 Ok(Self {
1135 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1136 label: ":memory:".into(),
1137 })
1138 }
1139
1140 pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> io::Result<Self> {
1142 let url = url.into();
1143 let label = url.clone();
1144 let db = Self::block(async {
1145 libsql::Builder::new_remote(url, auth_token.into())
1146 .build()
1147 .await
1148 })?;
1149 let conn = Self::block(async { db.connect() })
1150 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1151 Self::block(Self::apply_schema(&conn))?;
1152 Ok(Self {
1153 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1154 label,
1155 })
1156 }
1157
1158 pub fn open_replica(
1160 path: impl Into<PathBuf>,
1161 url: impl Into<String>,
1162 auth_token: impl Into<String>,
1163 ) -> io::Result<Self> {
1164 let path = path.into();
1165 let label = format!("{} (replica of {})", path.display(), url.into());
1166 let db = Self::block(async {
1167 libsql::Builder::new_remote_replica(path, url.into(), auth_token.into())
1168 .build()
1169 .await
1170 })?;
1171 let conn = Self::block(async { db.connect() })
1172 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1173 Self::block(Self::apply_schema(&conn))?;
1174 Ok(Self {
1175 conn: std::sync::Arc::new(tokio::sync::Mutex::new(conn)),
1176 label,
1177 })
1178 }
1179
1180 async fn read_session_async(
1181 conn: &libsql::Connection,
1182 ) -> Result<PersistedSession, libsql::Error> {
1183 use libsql::de;
1184
1185 let home_dc_id: i32 = conn
1187 .query("SELECT value FROM meta WHERE key = 'home_dc_id'", ())
1188 .await?
1189 .next()
1190 .await?
1191 .map(|r| r.get::<i32>(0))
1192 .transpose()?
1193 .unwrap_or(0);
1194
1195 let mut rows = conn
1197 .query(
1198 "SELECT dc_id, flags, addr, auth_key, first_salt, time_offset FROM dcs",
1199 (),
1200 )
1201 .await?;
1202 let mut dcs = Vec::new();
1203 while let Some(row) = rows.next().await? {
1204 let dc_id: i32 = row.get(0)?;
1205 let flags_raw: u8 = row.get::<i64>(1)? as u8;
1206 let addr: String = row.get(2)?;
1207 let key_blob: Option<Vec<u8>> = row.get(3)?;
1208 let first_salt: i64 = row.get(4)?;
1209 let time_offset: i32 = row.get(5)?;
1210 let auth_key = match key_blob {
1211 Some(b) if b.len() == 256 => {
1212 let mut k = [0u8; 256];
1213 k.copy_from_slice(&b);
1214 Some(k)
1215 }
1216 Some(b) => {
1217 return Err(libsql::Error::Misuse(format!(
1218 "auth_key blob must be 256 bytes, got {}",
1219 b.len()
1220 )));
1221 }
1222 None => None,
1223 };
1224 dcs.push(DcEntry {
1225 dc_id,
1226 addr,
1227 auth_key,
1228 first_salt,
1229 time_offset,
1230 flags: DcFlags(flags_raw),
1231 });
1232 }
1233
1234 let mut us_row = conn
1236 .query(
1237 "SELECT pts, qts, date, seq FROM update_state WHERE id = 1",
1238 (),
1239 )
1240 .await?;
1241 let updates_state = if let Some(r) = us_row.next().await? {
1242 UpdatesStateSnap {
1243 pts: r.get(0)?,
1244 qts: r.get(1)?,
1245 date: r.get(2)?,
1246 seq: r.get(3)?,
1247 channels: vec![],
1248 }
1249 } else {
1250 UpdatesStateSnap::default()
1251 };
1252
1253 let mut ch_rows = conn
1255 .query("SELECT channel_id, pts FROM channel_pts", ())
1256 .await?;
1257 let mut channels = Vec::new();
1258 while let Some(r) = ch_rows.next().await? {
1259 channels.push((r.get::<i64>(0)?, r.get::<i32>(1)?));
1260 }
1261
1262 let mut peer_rows = conn
1264 .query("SELECT id, access_hash, is_channel FROM peers", ())
1265 .await?;
1266 let mut peers = Vec::new();
1267 while let Some(r) = peer_rows.next().await? {
1268 peers.push(CachedPeer {
1269 id: r.get(0)?,
1270 access_hash: r.get(1)?,
1271 is_channel: r.get::<i32>(2)? != 0,
1272 });
1273 }
1274
1275 Ok(PersistedSession {
1276 home_dc_id,
1277 dcs,
1278 updates_state: UpdatesStateSnap {
1279 channels,
1280 ..updates_state
1281 },
1282 peers,
1283 })
1284 }
1285
1286 async fn write_session_async(
1287 conn: &libsql::Connection,
1288 s: &PersistedSession,
1289 ) -> Result<(), libsql::Error> {
1290 conn.execute_batch("BEGIN IMMEDIATE").await?;
1291
1292 conn.execute(
1293 "INSERT INTO meta (key, value) VALUES ('home_dc_id', ?1)
1294 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
1295 libsql::params![s.home_dc_id],
1296 )
1297 .await?;
1298
1299 conn.execute("DELETE FROM dcs", ()).await?;
1300 for d in &s.dcs {
1301 conn.execute(
1302 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1303 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1304 libsql::params![
1305 d.dc_id,
1306 d.flags.0 as i64,
1307 d.addr.clone(),
1308 d.auth_key.map(|k| k.to_vec()),
1309 d.first_salt,
1310 d.time_offset,
1311 ],
1312 )
1313 .await?;
1314 }
1315
1316 let us = &s.updates_state;
1317 conn.execute(
1318 "INSERT INTO update_state (id, pts, qts, date, seq) VALUES (1,?1,?2,?3,?4)
1319 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1320 date=excluded.date,seq=excluded.seq",
1321 libsql::params![us.pts, us.qts, us.date, us.seq],
1322 )
1323 .await?;
1324
1325 conn.execute("DELETE FROM channel_pts", ()).await?;
1326 for &(cid, cpts) in &us.channels {
1327 conn.execute(
1328 "INSERT INTO channel_pts (channel_id, pts) VALUES (?1,?2)",
1329 libsql::params![cid, cpts],
1330 )
1331 .await?;
1332 }
1333
1334 conn.execute("DELETE FROM peers", ()).await?;
1335 for p in &s.peers {
1336 conn.execute(
1337 "INSERT INTO peers (id, access_hash, is_channel) VALUES (?1,?2,?3)",
1338 libsql::params![p.id, p.access_hash, p.is_channel as i32],
1339 )
1340 .await?;
1341 }
1342
1343 conn.execute_batch("COMMIT").await
1344 }
1345}
1346
1347#[cfg(feature = "libsql-session")]
1348impl SessionBackend for LibSqlBackend {
1349 fn save(&self, session: &PersistedSession) -> io::Result<()> {
1350 let conn = self.conn.clone();
1351 let session = session.clone();
1352 Self::block(async move {
1353 let conn = conn.lock().await;
1354 Self::write_session_async(&conn, session).await
1355 })
1356 }
1357
1358 fn load(&self) -> io::Result<Option<PersistedSession>> {
1359 let conn = self.conn.clone();
1360 let count: i64 = Self::block(async move {
1361 let conn = conn.lock().await;
1362 let mut rows = conn.query("SELECT COUNT(*) FROM meta", ()).await?;
1363 Ok::<i64, libsql::Error>(rows.next().await?.and_then(|r| r.get(0).ok()).unwrap_or(0))
1364 })?;
1365 if count == 0 {
1366 return Ok(None);
1367 }
1368 let conn = self.conn.clone();
1369 Self::block(async move {
1370 let conn = conn.lock().await;
1371 Self::read_session_async(&conn).await
1372 })
1373 .map(Some)
1374 }
1375
1376 fn delete(&self) -> io::Result<()> {
1377 let conn = self.conn.clone();
1378 Self::block(async move {
1379 let conn = conn.lock().await;
1380 conn.execute_batch(
1381 "BEGIN IMMEDIATE;
1382 DELETE FROM meta;
1383 DELETE FROM dcs;
1384 DELETE FROM update_state;
1385 DELETE FROM channel_pts;
1386 DELETE FROM peers;
1387 COMMIT;",
1388 )
1389 .await
1390 })
1391 }
1392
1393 fn name(&self) -> &str {
1394 &self.label
1395 }
1396
1397 fn update_dc(&self, entry: &DcEntry) -> io::Result<()> {
1400 let conn = self.conn.clone();
1401 let (dc_id, addr, key, salt, off, flags) = (
1402 entry.dc_id,
1403 entry.addr.clone(),
1404 entry.auth_key.map(|k| k.to_vec()),
1405 entry.first_salt,
1406 entry.time_offset,
1407 entry.flags.0 as i64,
1408 );
1409 Self::block(async move {
1410 let conn = conn.lock().await;
1411 conn.execute(
1412 "INSERT INTO dcs (dc_id, flags, addr, auth_key, first_salt, time_offset)
1413 VALUES (?1,?6,?2,?3,?4,?5)
1414 ON CONFLICT(dc_id, flags) DO UPDATE SET
1415 addr=excluded.addr, auth_key=excluded.auth_key,
1416 first_salt=excluded.first_salt, time_offset=excluded.time_offset",
1417 libsql::params![dc_id, addr, key, salt, off, flags],
1418 )
1419 .await
1420 .map(|_| ())
1421 })
1422 }
1423
1424 fn set_home_dc(&self, dc_id: i32) -> io::Result<()> {
1425 let conn = self.conn.clone();
1426 Self::block(async move {
1427 let conn = conn.lock().await;
1428 conn.execute(
1429 "INSERT INTO meta (key, value) VALUES ('home_dc_id',?1)
1430 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
1431 libsql::params![dc_id],
1432 )
1433 .await
1434 .map(|_| ())
1435 })
1436 }
1437
1438 fn apply_update_state(&self, update: UpdateStateChange) -> io::Result<()> {
1439 let conn = self.conn.clone();
1440 Self::block(async move {
1441 let conn = conn.lock().await;
1442 match update {
1443 UpdateStateChange::All(snap) => {
1444 conn.execute(
1445 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,?2,?3,?4)
1446 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,qts=excluded.qts,
1447 date=excluded.date,seq=excluded.seq",
1448 libsql::params![snap.pts, snap.qts, snap.date, snap.seq],
1449 )
1450 .await?;
1451 conn.execute("DELETE FROM channel_pts", ()).await?;
1452 for &(cid, cpts) in &snap.channels {
1453 conn.execute(
1454 "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)",
1455 libsql::params![cid, cpts],
1456 )
1457 .await?;
1458 }
1459 Ok(())
1460 }
1461 UpdateStateChange::Primary { pts, date, seq } => conn
1462 .execute(
1463 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,?1,0,?2,?3)
1464 ON CONFLICT(id) DO UPDATE SET pts=excluded.pts,date=excluded.date,
1465 seq=excluded.seq",
1466 libsql::params![pts, date, seq],
1467 )
1468 .await
1469 .map(|_| ()),
1470 UpdateStateChange::Secondary { qts } => conn
1471 .execute(
1472 "INSERT INTO update_state (id,pts,qts,date,seq) VALUES (1,0,?1,0,0)
1473 ON CONFLICT(id) DO UPDATE SET qts=excluded.qts",
1474 libsql::params![qts],
1475 )
1476 .await
1477 .map(|_| ()),
1478 UpdateStateChange::Channel { id, pts } => conn
1479 .execute(
1480 "INSERT INTO channel_pts (channel_id,pts) VALUES (?1,?2)
1481 ON CONFLICT(channel_id) DO UPDATE SET pts=excluded.pts",
1482 libsql::params![id, pts],
1483 )
1484 .await
1485 .map(|_| ()),
1486 }
1487 })
1488 }
1489
1490 fn cache_peer(&self, peer: &CachedPeer) -> io::Result<()> {
1491 let conn = self.conn.clone();
1492 let (id, hash, is_ch) = (peer.id, peer.access_hash, peer.is_channel as i32);
1493 Self::block(async move {
1494 let conn = conn.lock().await;
1495 conn.execute(
1496 "INSERT INTO peers (id,access_hash,is_channel) VALUES (?1,?2,?3)
1497 ON CONFLICT(id) DO UPDATE SET
1498 access_hash=excluded.access_hash,
1499 is_channel=excluded.is_channel",
1500 libsql::params![id, hash, is_ch],
1501 )
1502 .await
1503 .map(|_| ())
1504 })
1505 }
1506}