delta/lib.rs
1pub use network::connection_params::ConnectionParams;
2use std::{future::Future, pin::Pin, sync::Arc};
3use tokio::{sync::Mutex, task::JoinHandle};
4use uuid::Uuid;
5use std::error::Error;
6
7mod change;
8mod database;
9mod merge;
10mod network;
11mod node;
12
13pub use change::{Change, Value, TransactionType};
14use node::node_handler::NodeHandler;
15
16/// `Delta` is the main struct representing a node in the Delta network. It can function as either a Master or Auxiliary node.
17/// The struct contains various attributes and methods to manage and interact with the node.
18pub struct Delta {
19 node_handler: NodeHandler,
20 node_handle: JoinHandle<()>,
21 node_id: String,
22 node_type: NodeType,
23}
24
25/// The type of node in the Delta network.
26///
27/// A `NodeType` can be:
28/// - `Master`: Represents the primary node that can manage and coordinate with auxiliary nodes.
29/// - `Auxiliary`: Represents a secondary node that connects to and syncs with the master node.
30
31pub enum NodeType {
32 Master,
33 Auxiliary,
34}
35
36/// Configuration options for initializing a Delta node.
37///
38/// `DeltaConfig` can be:
39/// - `Master`: Configuration for a Master node.
40/// - `Auxiliary`: Configuration for an Auxiliary node, which includes connection parameters for the master node.
41#[derive(Clone)]
42pub enum DeltaConfig {
43 /// Configuration for a Master node.
44 ///
45 /// # Fields
46 /// * `ip` - The IP address for the node.
47 /// * `port` - The port number for the node.
48 /// * `db_path` - The file path to the database.
49 /// * `tables` - A list of tables to be tracked.
50 Master {
51 ip: String,
52 port: u16,
53 db_path: String,
54 tables: Vec<String>,
55 },
56 /// Configuration for an Auxiliary node.
57 ///
58 /// # Fields
59 /// * `ip` - The IP address for the node.
60 /// * `port` - The port number for the node.
61 /// * `db_path` - The file path to the database.
62 /// * `tables` - A list of tables to be tracked.
63 /// * `master_connection_params` - Connection parameters for connecting to the master node.
64 Auxiliary {
65 ip: String,
66 port: u16,
67 db_path: String,
68 tables: Vec<String>,
69 master_connection_params: ConnectionParams,
70 },
71}
72
73impl Delta {
74 /// Creates a new Delta node with the specified configuration and an optional custom node ID.
75 ///
76 /// # Arguments
77 ///
78 /// * `delta_config` - The configuration for the Delta node.
79 /// * `custom_node_id` - An optional custom node ID. If not provided, a new UUID will be generated.
80 /// * `error_handler` - An optional error handler that will be called when an error occurs.
81 ///
82 /// # Returns
83 ///
84 /// A new instance of `Delta`.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// let config = DeltaConfig::Master { ip: "127.0.0.1".to_string(), port: 8080, db_path: "/path/to/db".to_string(), tables: vec!["table1".to_string()] };
90 /// let delta = Delta::new(config, None, None).await;
91 /// ```
92 pub async fn new(
93 delta_config: DeltaConfig,
94 custom_node_id: Option<String>,
95 error_handler: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>
96 ) -> Delta {
97 // Generate UUID for the node_id or use custom node_id if provided
98 let node_id: String;
99 match custom_node_id {
100 Some(id) => node_id = id,
101 None => node_id = Uuid::new_v4().to_string(),
102 }
103
104 let (node_handler, node_receiver) = NodeHandler::new(error_handler);
105
106 let receiver_arc = Arc::new(Mutex::new(node_receiver));
107
108 let node_handle = node_handler
109 .start(receiver_arc, node_id.clone(), delta_config.clone())
110 .await;
111
112 let node_type = match delta_config {
113 DeltaConfig::Master {
114 ip: _,
115 port: _,
116 db_path: _,
117 tables: _,
118 } => NodeType::Master,
119 DeltaConfig::Auxiliary {
120 ip: _,
121 port: _,
122 db_path: _,
123 tables: _,
124 master_connection_params: _,
125 } => NodeType::Auxiliary,
126 };
127
128 Delta {
129 node_handler,
130 node_handle,
131 node_id,
132 node_type,
133 }
134 }
135
136 /// Shuts down the Delta node, disconnecting from all nodes and terminating running processes.
137 ///
138 /// # Returns
139 ///
140 /// A boolean indicating whether the shutdown was successful.
141 pub async fn shutdown(&self) -> Result<bool, Box<dyn Error>> {
142 let success = self.node_handler.shutdown().await?;
143 if success {
144 self.node_handle.abort();
145 }
146 Ok(success)
147 }
148
149 // TODO implement the adding and removing of tables
150 // Adds a table to the list of tracked tables
151 // fn add_table(table: String) {}
152
153 // TODO implement the adding and removing of tables
154 // Removes a table from the list of tracked tables (can put on the backlog for now)
155 // fn remove_table(table: String) {}
156
157 /// Returns the list of tables that the Delta node is currently tracking.
158 ///
159 /// # Returns
160 ///
161 /// A vector of strings representing the table names.
162 pub async fn get_tracked_tables(&self) -> Result<Vec<String>, Box<dyn Error>> {
163 let tracked_tables = self.node_handler.get_tracked_tables().await?;
164 Ok(tracked_tables)
165 }
166
167 /// Executes a write operation using the provided SQL string.
168 ///
169 /// # Arguments
170 ///
171 /// * `sql` - The SQL string to be executed.
172 ///
173 /// # Returns
174 ///
175 /// An integer representing the result of the write operation.
176 // For now, allow all SQL statements — in the future we could do checking to make sure that no unsupported things are used
177 pub async fn execute_write(&self, sql: String) -> Result<i32, Box<dyn Error>> {
178 let result = self.node_handler.execute_write(sql).await?;
179 Ok(result)
180 }
181
182 /// Executes a read operation using the provided SQL string.
183 ///
184 /// # Arguments
185 ///
186 /// * `sql` - The SQL string to be executed.
187 ///
188 /// # Returns
189 ///
190 /// A vector of vectors containing the results of the read operation.
191 pub async fn execute_read(&self, sql: String) -> Result<Vec<Vec<Value>>, Box<dyn Error>> {
192 let result = self.node_handler.execute_read(sql).await?;
193 Ok(result)
194 }
195
196 /// Returns the entire changelog of the Delta node.
197 ///
198 /// # Returns
199 ///
200 /// A vector of `Change` objects representing the changelog.
201 pub async fn get_changelog(&self) -> Result<Vec<Change>, Box<dyn Error>> {
202 let changelog = self.node_handler.get_changelog().await?;
203 Ok(changelog)
204 }
205
206 /// Connects to a node using the provided connection parameters.
207 ///
208 /// # Arguments
209 ///
210 /// * `connection_params` - The connection parameters for the node to connect to.
211 ///
212 /// # Returns
213 ///
214 /// A boolean indicating whether the connection was successful.
215 // TODO enable https connections
216 pub async fn connect(&self, connection_params: ConnectionParams) -> Result<bool, Box<dyn Error>> {
217 let success = self.node_handler.connect(connection_params).await?;
218 Ok(success)
219 }
220
221 /// Disconnects from the specified node.
222 ///
223 /// # Arguments
224 ///
225 /// * `node_id` - The ID of the node to disconnect from.
226 ///
227 /// # Returns
228 ///
229 /// A boolean indicating whether the disconnection was successful.
230 pub async fn disconnect(&self, node_id: String) -> Result<bool, Box<dyn Error>> {
231 let success = self.node_handler.disconnect(node_id).await?;
232 Ok(success)
233 }
234
235 /// Returns the list of connected nodes.
236 ///
237 /// # Returns
238 ///
239 /// A vector of strings representing the IDs of connected nodes.
240 pub async fn get_connected_devices(&self) -> Result<Vec<String>, Box<dyn Error>> {
241 let connected_nodes = self.node_handler.get_connected_nodes().await?;
242 Ok(connected_nodes)
243 }
244
245 /// Returns the ID of the Delta node.
246 ///
247 /// # Returns
248 ///
249 /// A string representing the node ID.
250 pub fn get_node_id(&self) -> String {
251 self.node_id.clone()
252 }
253
254 /// Registers an asynchronous callback function to be called on a database change.
255 ///
256 /// # Arguments
257 ///
258 /// * `callback` - The callback function to register.
259 ///
260 /// # Returns
261 ///
262 /// A boolean indicating whether the callback registration was successful.
263 pub async fn register_db_change_callback<F, Fut>(&self, callback: F) -> Result<bool, Box<dyn Error>>
264 where
265 F: Fn() -> Fut + Send + Sync + 'static,
266 Fut: Future<Output = ()> + Send + 'static,
267 {
268 let cb = Arc::new(move || {
269 let fut = callback();
270 Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + Send>>
271 });
272
273 let success = self.node_handler.register_db_change_callback(cb).await?;
274
275 Ok(success)
276 }
277}