use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use eyre::Result;
use matrix_sdk::config::SyncSettings;
use matrix_sdk::sync::SyncResponse;
use matrix_sdk::{Client, LoopCtrl};
use rusqlite::OptionalExtension;
use tokio_stream::StreamExt;
use tracing::{debug, instrument, trace};
use crate::db::SQLiteHelper;
#[derive(Clone, Debug)]
pub struct SyncHelper {
inner: Arc<Mutex<SyncHelperInner>>,
}
#[derive(Debug)]
struct SyncHelperInner {
session_db: SQLiteHelper,
sync_token: Option<String>,
}
impl SyncHelper {
#[instrument(name = "SyncHelper", skip_all)]
pub fn new(data_dir: &Path) -> Result<Self> {
Self::from_opened_db(SQLiteHelper::open(
&data_dir.join("matrixbot-ezlogin.sqlite3"),
false,
)?)
}
pub(crate) fn from_opened_db(session_db: SQLiteHelper) -> Result<Self> {
let sync_token = session_db
.query_row("SELECT token FROM sync_token WHERE id = 0;", (), |row| {
row.get(0)
})
.optional()?;
Ok(Self {
inner: Arc::new(Mutex::new(SyncHelperInner {
session_db,
sync_token,
})),
})
}
pub fn get_sync_token(&self) -> Option<String> {
let token = self
.inner
.lock()
.unwrap()
.sync_token
.clone();
debug!("Current sync token: {}", token.as_deref().unwrap_or("None"));
token
}
pub fn set_sync_token(&self, token: String) -> Result<()> {
debug!("Next sync token: {}", token);
let mut inner = self
.inner
.lock()
.unwrap();
inner
.session_db
.prepare_cached("INSERT OR REPLACE INTO sync_token (id, token) VALUES (0, ?);")?
.execute((&token,))?;
inner.sync_token = Some(token);
Ok(())
}
pub fn process_sync_settings(&self, mut sync_settings: SyncSettings) -> SyncSettings {
if let Some(token) = self.get_sync_token() {
sync_settings = sync_settings.token(token);
}
sync_settings
}
pub fn process_sync_response(
&self,
sync_response: &SyncResponse,
) -> Result<LoopCtrl, matrix_sdk::Error> {
self.set_sync_token(sync_response.next_batch.clone())
.map_err(|err| matrix_sdk::Error::UnknownError(err.into()))?;
Ok(LoopCtrl::Continue)
}
pub async fn sync_once(
&self,
client: &Client,
sync_settings: SyncSettings,
) -> Result<SyncResponse, matrix_sdk::Error> {
let sync_stream = client
.sync_stream(self.process_sync_settings(sync_settings))
.await;
tokio::pin!(sync_stream);
let response = sync_stream
.next()
.await
.unwrap()?;
trace!("Sync response: {:?}", response);
self.process_sync_response(&response)?;
Ok(response)
}
pub async fn sync(
&self,
client: &Client,
sync_settings: SyncSettings,
) -> Result<(), matrix_sdk::Error> {
let sync_stream = client
.sync_stream(self.process_sync_settings(sync_settings))
.await;
tokio::pin!(sync_stream);
loop {
let response = sync_stream
.next()
.await
.unwrap()?;
trace!("Sync response: {:?}", response);
self.process_sync_response(&response)?;
}
}
}