use std::{
fmt,
sync::{Arc, Mutex, MutexGuard},
};
use tokio::sync::broadcast;
use crate::{db::models::PriceTickRow, sync::SyncMode};
use super::process::{
error::{SyncProcessFatalError, SyncProcessRecoverableError},
sync_funding_settlements_task::funding_settlements_state::FundingSettlementsState,
sync_price_history_task::price_history_state::PriceHistoryState,
};
#[derive(Debug, Clone)]
pub enum SyncStatusNotSynced {
NotInitiated,
Starting,
InProgress,
WaitingForResync,
Failed(Arc<SyncProcessRecoverableError>),
Restarting,
}
impl fmt::Display for SyncStatusNotSynced {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotInitiated => write!(f, "Not initiated"),
Self::Starting => write!(f, "Starting"),
Self::InProgress => write!(f, "In progress"),
Self::WaitingForResync => write!(f, "Waiting for resync"),
Self::Failed(error) => write!(f, "Failed: {error}"),
Self::Restarting => write!(f, "Restarting"),
}
}
}
#[derive(Debug, Clone)]
pub enum SyncStatus {
NotSynced(SyncStatusNotSynced),
Synced,
Backfilled,
ShutdownInitiated,
Shutdown,
Terminated(Arc<SyncProcessFatalError>),
}
impl SyncStatus {
pub fn is_stopped(&self) -> bool {
matches!(self, Self::Shutdown | Self::Terminated(_))
}
}
impl fmt::Display for SyncStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotSynced(status) => write!(f, "Not synced ({status})"),
Self::Synced => write!(f, "Synced"),
Self::Backfilled => write!(f, "Backfilled"),
Self::ShutdownInitiated => write!(f, "Shutdown initiated"),
Self::Shutdown => write!(f, "Shutdown"),
Self::Terminated(error) => write!(f, "Terminated: {error}"),
}
}
}
impl From<SyncStatusNotSynced> for SyncStatus {
fn from(value: SyncStatusNotSynced) -> Self {
Self::NotSynced(value)
}
}
impl From<SyncProcessRecoverableError> for SyncStatus {
fn from(value: SyncProcessRecoverableError) -> Self {
SyncStatusNotSynced::Failed(Arc::new(value)).into()
}
}
impl From<Arc<SyncProcessFatalError>> for SyncStatus {
fn from(value: Arc<SyncProcessFatalError>) -> Self {
Self::Terminated(value)
}
}
impl From<SyncProcessFatalError> for SyncStatus {
fn from(value: SyncProcessFatalError) -> Self {
Arc::new(value).into()
}
}
#[derive(Debug, Clone)]
pub enum SyncUpdate {
Status(SyncStatus),
PriceTick(PriceTickRow),
PriceHistoryState(PriceHistoryState),
FundingSettlementsState(FundingSettlementsState),
}
impl From<SyncStatus> for SyncUpdate {
fn from(value: SyncStatus) -> Self {
Self::Status(value)
}
}
impl From<PriceTickRow> for SyncUpdate {
fn from(value: PriceTickRow) -> Self {
Self::PriceTick(value)
}
}
impl From<PriceHistoryState> for SyncUpdate {
fn from(value: PriceHistoryState) -> Self {
Self::PriceHistoryState(value)
}
}
impl From<FundingSettlementsState> for SyncUpdate {
fn from(value: FundingSettlementsState) -> Self {
Self::FundingSettlementsState(value)
}
}
pub(super) type SyncTransmitter = broadcast::Sender<SyncUpdate>;
pub type SyncReceiver = broadcast::Receiver<SyncUpdate>;
pub trait SyncReader: Send + Sync + 'static {
fn mode(&self) -> SyncMode;
fn update_receiver(&self) -> SyncReceiver;
fn status_snapshot(&self) -> SyncStatus;
}
#[derive(Debug)]
pub(super) struct SyncStatusManager {
mode: SyncMode,
status: Mutex<SyncStatus>,
update_tx: SyncTransmitter,
}
impl SyncStatusManager {
pub fn new(mode: SyncMode, update_tx: SyncTransmitter) -> Arc<Self> {
let status = Mutex::new(SyncStatusNotSynced::NotInitiated.into());
Arc::new(Self {
mode,
status,
update_tx,
})
}
fn lock_status(&self) -> MutexGuard<'_, SyncStatus> {
self.status
.lock()
.expect("`SyncStatusManager` mutex can't be poisoned")
}
fn update_status_guard(
&self,
mut status_guard: MutexGuard<'_, SyncStatus>,
new_status: SyncStatus,
) {
*status_guard = new_status.clone();
drop(status_guard);
let _ = self.update_tx.send(new_status.into());
}
pub fn update(&self, new_status: SyncStatus) {
let status_guard = self.lock_status();
self.update_status_guard(status_guard, new_status);
}
}
impl SyncReader for SyncStatusManager {
fn mode(&self) -> SyncMode {
self.mode
}
fn update_receiver(&self) -> SyncReceiver {
self.update_tx.subscribe()
}
fn status_snapshot(&self) -> SyncStatus {
self.lock_status().clone()
}
}