1use std::sync::{Arc, OnceLock};
2use url::Url;
3
4use super::config::Config;
5use super::database::{Database, DatabaseSetupError};
6use super::jax_state::JaxState;
7use super::sync_manager::SyncEvent;
8
9use common::prelude::*;
10
11#[derive(Clone)]
12pub struct State {
13 node: Peer,
14 database: Database,
15 jax_state: Arc<JaxState>,
16 sync_sender: Arc<OnceLock<flume::Sender<SyncEvent>>>,
17}
18
19impl State {
20 pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
21 let sqlite_database_url = match config.sqlite_path {
22 Some(ref path) => {
23 if !path.exists() {
25 return Err(StateSetupError::DatabasePathDoesNotExist);
26 }
27 Url::parse(&format!("sqlite://{}", path.display()))
29 .map_err(|_| StateSetupError::InvalidDatabaseUrl)
30 }
31 None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
33 }?;
34 tracing::info!("Database URL: {:?}", sqlite_database_url);
35 let database = Database::connect(&sqlite_database_url).await?;
36
37 let jax_state = Arc::new(JaxState::new(database.clone()));
41
42 let mut node_builder = Peer::builder().protocol_state(jax_state.clone());
44
45 if config.node_listen_addr.is_some() {
47 node_builder = node_builder.socket_addr(config.node_listen_addr.unwrap());
48 }
49 if config.node_secret.is_some() {
51 node_builder = node_builder.secret_key(config.node_secret.clone().unwrap());
52 }
53 if config.node_blobs_store_path.is_some() {
55 node_builder =
56 node_builder.blobs_store_path(config.node_blobs_store_path.clone().unwrap());
57 }
58
59 let node = node_builder.build().await;
61
62 let bound_addrs = node.endpoint().bound_sockets();
64 tracing::info!("Node id: {} (with JAX protocol)", node.id());
65 tracing::info!("Peer listening on: {:?}", bound_addrs);
66
67 jax_state.set_blobs(node.blobs().clone());
69
70 Ok(Self {
71 node,
72 database,
73 jax_state,
74 sync_sender: Arc::new(OnceLock::new()),
75 })
76 }
77
78 pub fn node(&self) -> &Peer {
79 &self.node
80 }
81
82 pub fn database(&self) -> &Database {
83 &self.database
84 }
85
86 pub fn jax_state(&self) -> &Arc<JaxState> {
87 &self.jax_state
88 }
89
90 pub fn set_sync_sender(&self, sender: flume::Sender<SyncEvent>) {
92 let _ = self.sync_sender.set(sender.clone());
93 self.jax_state.set_sync_sender(sender);
95 }
96
97 pub fn send_sync_event(&self, event: SyncEvent) -> Result<(), SyncEventError> {
99 let sender = self
100 .sync_sender
101 .get()
102 .ok_or(SyncEventError::SyncManagerNotInitialized)?;
103 sender.send(event).map_err(|_| SyncEventError::SendFailed)
104 }
105}
106
107impl AsRef<Peer> for State {
108 fn as_ref(&self) -> &Peer {
109 &self.node
110 }
111}
112
113impl AsRef<Database> for State {
114 fn as_ref(&self) -> &Database {
115 &self.database
116 }
117}
118
119#[derive(Debug, thiserror::Error)]
120pub enum StateSetupError {
121 #[error("Database path does not exist")]
122 DatabasePathDoesNotExist,
123 #[error("Database setup error")]
124 DatabaseSetupError(#[from] DatabaseSetupError),
125 #[error("Invalid database URL")]
126 InvalidDatabaseUrl,
127}
128
129#[derive(Debug, thiserror::Error)]
130pub enum SyncEventError {
131 #[error("Sync manager not initialized")]
132 SyncManagerNotInitialized,
133 #[error("Failed to send sync event")]
134 SendFailed,
135}