1use std::sync::{Arc, OnceLock};
2use url::Url;
3
4use super::config::Config;
5use super::database::{Database, DatabaseSetupError};
6use super::jax_state::JaxState;
7use super::sync_manager::SyncEvent;
8
9use common::prelude::*;
10
11#[derive(Clone)]
12pub struct State {
13    node: Peer,
14    database: Database,
15    jax_state: Arc<JaxState>,
16    sync_sender: Arc<OnceLock<flume::Sender<SyncEvent>>>,
17}
18
19impl State {
20    pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
21        let sqlite_database_url = match config.sqlite_path {
22            Some(ref path) => {
23                if !path.exists() {
25                    return Err(StateSetupError::DatabasePathDoesNotExist);
26                }
27                Url::parse(&format!("sqlite://{}", path.display()))
29                    .map_err(|_| StateSetupError::InvalidDatabaseUrl)
30            }
31            None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
33        }?;
34        tracing::info!("Database URL: {:?}", sqlite_database_url);
35        let database = Database::connect(&sqlite_database_url).await?;
36
37        let jax_state = Arc::new(JaxState::new(database.clone()));
41
42        let mut node_builder = Peer::builder().protocol_state(jax_state.clone());
44
45        if config.node_listen_addr.is_some() {
47            node_builder = node_builder.socket_addr(config.node_listen_addr.unwrap());
48        }
49        if config.node_secret.is_some() {
51            node_builder = node_builder.secret_key(config.node_secret.clone().unwrap());
52        }
53        if config.node_blobs_store_path.is_some() {
55            node_builder =
56                node_builder.blobs_store_path(config.node_blobs_store_path.clone().unwrap());
57        }
58
59        let node = node_builder.build().await;
61
62        let bound_addrs = node.endpoint().bound_sockets();
64        tracing::info!("Node id: {} (with JAX protocol)", node.id());
65        tracing::info!("Peer listening on: {:?}", bound_addrs);
66
67        jax_state.set_blobs(node.blobs().clone());
69
70        Ok(Self {
71            node,
72            database,
73            jax_state,
74            sync_sender: Arc::new(OnceLock::new()),
75        })
76    }
77
78    pub fn node(&self) -> &Peer {
79        &self.node
80    }
81
82    pub fn database(&self) -> &Database {
83        &self.database
84    }
85
86    pub fn jax_state(&self) -> &Arc<JaxState> {
87        &self.jax_state
88    }
89
90    pub fn set_sync_sender(&self, sender: flume::Sender<SyncEvent>) {
92        let _ = self.sync_sender.set(sender.clone());
93        self.jax_state.set_sync_sender(sender);
95    }
96
97    pub fn send_sync_event(&self, event: SyncEvent) -> Result<(), SyncEventError> {
99        let sender = self
100            .sync_sender
101            .get()
102            .ok_or(SyncEventError::SyncManagerNotInitialized)?;
103        sender.send(event).map_err(|_| SyncEventError::SendFailed)
104    }
105}
106
107impl AsRef<Peer> for State {
108    fn as_ref(&self) -> &Peer {
109        &self.node
110    }
111}
112
113impl AsRef<Database> for State {
114    fn as_ref(&self) -> &Database {
115        &self.database
116    }
117}
118
119#[derive(Debug, thiserror::Error)]
120pub enum StateSetupError {
121    #[error("Database path does not exist")]
122    DatabasePathDoesNotExist,
123    #[error("Database setup error")]
124    DatabaseSetupError(#[from] DatabaseSetupError),
125    #[error("Invalid database URL")]
126    InvalidDatabaseUrl,
127}
128
129#[derive(Debug, thiserror::Error)]
130pub enum SyncEventError {
131    #[error("Sync manager not initialized")]
132    SyncManagerNotInitialized,
133    #[error("Failed to send sync event")]
134    SendFailed,
135}