use std::{
fmt,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use tokio::{
sync::broadcast::{self, error::RecvError},
time,
};
use lnm_sdk::{api_v2::WebSocketClient, api_v3::RestClient};
use crate::{
db::Database,
shared::Lookback,
sync::config::{SyncConfig, SyncControllerConfig},
tui::{
TuiControllerShutdown,
error::{Result as TuiResult, TuiError},
},
util::AbortOnDropHandle,
};
use super::{
error::{Result, SyncError},
process::{SyncProcess, error::SyncProcessFatalError},
state::{SyncReader, SyncReceiver, SyncStatus, SyncStatusManager, SyncTransmitter, SyncUpdate},
};
#[derive(Debug)]
pub struct SyncController {
config: SyncControllerConfig,
handle: Mutex<Option<AbortOnDropHandle<()>>>,
shutdown_tx: broadcast::Sender<()>,
status_manager: Arc<SyncStatusManager>,
}
impl SyncController {
fn new(
config: &SyncConfig,
handle: AbortOnDropHandle<()>,
shutdown_tx: broadcast::Sender<()>,
status_manager: Arc<SyncStatusManager>,
) -> Arc<Self> {
Arc::new(Self {
config: config.into(),
handle: Mutex::new(Some(handle)),
shutdown_tx,
status_manager,
})
}
pub fn reader(&self) -> Arc<dyn SyncReader> {
self.status_manager.clone()
}
pub fn mode(&self) -> SyncMode {
self.status_manager.mode()
}
pub fn update_receiver(&self) -> SyncReceiver {
self.status_manager.update_receiver()
}
pub fn status_snapshot(&self) -> SyncStatus {
self.status_manager.status_snapshot()
}
fn try_consume_handle(&self) -> Option<AbortOnDropHandle<()>> {
self.handle
.lock()
.expect("`SyncController` mutex can't be poisoned")
.take()
}
pub async fn shutdown(&self) -> Result<()> {
let Some(mut handle) = self.try_consume_handle() else {
return Err(SyncError::SyncAlreadyShutdown);
};
if handle.is_finished() {
let status = self.status_manager.status_snapshot();
return Err(SyncError::SyncAlreadyTerminated(status));
}
self.status_manager.update(SyncStatus::ShutdownInitiated);
let shutdown_send_res = self.shutdown_tx.send(()).map_err(|e| {
handle.abort();
SyncProcessFatalError::SendShutdownSignalFailed(e)
});
let shutdown_res = match shutdown_send_res {
Ok(_) => {
tokio::select! {
join_res = &mut handle => {
join_res.map_err(SyncProcessFatalError::SyncProcessTaskJoin)
}
_ = time::sleep(self.config.shutdown_timeout()) => {
handle.abort();
Err(SyncProcessFatalError::ShutdownTimeout)
}
}
}
Err(e) => Err(e),
};
if let Err(e) = shutdown_res {
let e_ref = Arc::new(e);
self.status_manager.update(e_ref.clone().into());
return Err(SyncError::SyncShutdownFailed(e_ref));
}
self.status_manager.update(SyncStatus::Shutdown);
Ok(())
}
pub async fn until_stopped(&self) -> SyncStatus {
let mut sync_rx = self.update_receiver();
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
loop {
match sync_rx.recv().await {
Ok(sync_update) => {
if let SyncUpdate::Status(status) = sync_update
&& status.is_stopped()
{
return status;
}
}
Err(RecvError::Lagged(_)) => {
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
}
Err(RecvError::Closed) => return self.status_snapshot(),
}
}
}
}
#[async_trait]
impl TuiControllerShutdown for SyncController {
async fn tui_shutdown(&self) -> TuiResult<()> {
self.shutdown().await.map_err(TuiError::SyncShutdownFailed)
}
}
#[derive(Debug, Clone, Copy)]
pub enum SyncMode {
Backfill,
Live(Option<Lookback>),
Full,
}
impl SyncMode {
pub fn backfill() -> Self {
SyncMode::Backfill
}
pub fn live_no_lookback() -> Self {
SyncMode::Live(None)
}
pub fn live_with_lookback(lookback: Lookback) -> Self {
SyncMode::Live(Some(lookback))
}
pub fn full() -> Self {
SyncMode::Full
}
pub fn live_feed_active(&self) -> bool {
match self {
SyncMode::Backfill => false,
SyncMode::Live(_) => true,
SyncMode::Full => true,
}
}
}
impl fmt::Display for SyncMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SyncMode::Backfill => write!(f, "Backfill"),
SyncMode::Live(lookback) => match lookback {
Some(lookback) => write!(f, "Live (lookback: {})", lookback),
None => write!(f, "Live"),
},
SyncMode::Full => write!(f, "Full"),
}
}
}
pub(super) enum SyncModeInt {
Backfill {
api_rest: Arc<RestClient>,
},
LiveNoLookback {
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
},
LiveWithLookback {
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
lookback: Lookback,
},
Full {
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
},
}
impl From<&SyncModeInt> for SyncMode {
fn from(value: &SyncModeInt) -> Self {
match value {
SyncModeInt::Backfill { .. } => Self::Backfill,
SyncModeInt::LiveNoLookback {
api_rest: _,
api_ws: _,
} => Self::Live(None),
SyncModeInt::LiveWithLookback {
api_rest: _,
api_ws: _,
lookback,
} => Self::Live(Some(*lookback)),
SyncModeInt::Full { .. } => Self::Full,
}
}
}
pub struct SyncEngine {
config: SyncConfig,
db: Arc<Database>,
mode_int: SyncModeInt,
status_manager: Arc<SyncStatusManager>,
update_tx: SyncTransmitter,
}
impl SyncEngine {
fn with_mode_int(
config: impl Into<SyncConfig>,
db: Arc<Database>,
mode_int: SyncModeInt,
) -> Self {
let (update_tx, _) = broadcast::channel::<SyncUpdate>(1_000);
let mode = (&mode_int).into();
let status_manager = SyncStatusManager::new(mode, update_tx.clone());
Self {
config: config.into(),
db,
mode_int,
status_manager,
update_tx,
}
}
pub(crate) fn live_no_lookback(
config: impl Into<SyncConfig>,
db: Arc<Database>,
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
) -> Self {
let mode_int = SyncModeInt::LiveNoLookback { api_rest, api_ws };
Self::with_mode_int(config, db, mode_int)
}
pub(crate) fn live_with_lookback(
config: impl Into<SyncConfig>,
db: Arc<Database>,
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
lookback: Lookback,
) -> Self {
let mode_int = SyncModeInt::LiveWithLookback {
api_rest,
api_ws,
lookback,
};
Self::with_mode_int(config, db, mode_int)
}
pub(crate) fn full(
config: impl Into<SyncConfig>,
db: Arc<Database>,
api_rest: Arc<RestClient>,
api_ws: Arc<WebSocketClient>,
) -> Self {
let mode_int = SyncModeInt::Full { api_rest, api_ws };
Self::with_mode_int(config, db, mode_int)
}
pub fn new(
config: impl Into<SyncConfig>,
db: Arc<Database>,
api_domain: impl ToString,
mode: SyncMode,
) -> Result<Self> {
let config: SyncConfig = config.into();
let domain = api_domain.to_string();
let api_rest = RestClient::new(&config, domain.clone()).map_err(SyncError::RestApiInit)?;
let api_ws = WebSocketClient::new(&config, domain);
let mode = match mode {
SyncMode::Backfill => SyncModeInt::Backfill { api_rest },
SyncMode::Live(lookback) => match lookback {
Some(lookback) => SyncModeInt::LiveWithLookback {
api_rest,
api_ws,
lookback,
},
None => SyncModeInt::LiveNoLookback { api_rest, api_ws },
},
SyncMode::Full => SyncModeInt::Full { api_rest, api_ws },
};
Ok(Self::with_mode_int(config, db, mode))
}
pub fn reader(&self) -> Arc<dyn SyncReader> {
self.status_manager.clone()
}
pub fn update_receiver(&self) -> SyncReceiver {
self.status_manager.update_receiver()
}
pub fn status_snapshot(&self) -> SyncStatus {
self.status_manager.status_snapshot()
}
pub fn start(self) -> Arc<SyncController> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let handle = SyncProcess::spawn(
&self.config,
self.db,
self.mode_int,
shutdown_tx.clone(),
self.status_manager.clone(),
self.update_tx,
);
SyncController::new(&self.config, handle, shutdown_tx, self.status_manager)
}
}