delta/
lib.rs

1pub use network::connection_params::ConnectionParams;
2use std::{future::Future, pin::Pin, sync::Arc};
3use tokio::{sync::Mutex, task::JoinHandle};
4use uuid::Uuid;
5use std::error::Error;
6
7mod change;
8mod database;
9mod merge;
10mod network;
11mod node;
12
13pub use change::{Change, Value, TransactionType};
14use node::node_handler::NodeHandler;
15
16/// `Delta` is the main struct representing a node in the Delta network. It can function as either a Master or Auxiliary node.
17/// The struct contains various attributes and methods to manage and interact with the node.
18pub struct Delta {
19    node_handler: NodeHandler,
20    node_handle: JoinHandle<()>,
21    node_id: String,
22    node_type: NodeType,
23}
24
25/// The type of node in the Delta network.
26///
27/// A `NodeType` can be:
28/// - `Master`: Represents the primary node that can manage and coordinate with auxiliary nodes.
29/// - `Auxiliary`: Represents a secondary node that connects to and syncs with the master node.
30
31pub enum NodeType {
32    Master,
33    Auxiliary,
34}
35
36/// Configuration options for initializing a Delta node.
37///
38/// `DeltaConfig` can be:
39/// - `Master`: Configuration for a Master node.
40/// - `Auxiliary`: Configuration for an Auxiliary node, which includes connection parameters for the master node.
41#[derive(Clone)]
42pub enum DeltaConfig {
43    /// Configuration for a Master node.
44    ///
45    /// # Fields
46    /// * `ip` - The IP address for the node.
47    /// * `port` - The port number for the node.
48    /// * `db_path` - The file path to the database.
49    /// * `tables` - A list of tables to be tracked.
50    Master {
51        ip: String,
52        port: u16,
53        db_path: String,
54        tables: Vec<String>,
55    },
56    /// Configuration for an Auxiliary node.
57    ///
58    /// # Fields
59    /// * `ip` - The IP address for the node.
60    /// * `port` - The port number for the node.
61    /// * `db_path` - The file path to the database.
62    /// * `tables` - A list of tables to be tracked.
63    /// * `master_connection_params` - Connection parameters for connecting to the master node.
64    Auxiliary {
65        ip: String,
66        port: u16,
67        db_path: String,
68        tables: Vec<String>,
69        master_connection_params: ConnectionParams,
70    },
71}
72
73impl Delta {
74    /// Creates a new Delta node with the specified configuration and an optional custom node ID.
75    ///
76    /// # Arguments
77    ///
78    /// * `delta_config` - The configuration for the Delta node.
79    /// * `custom_node_id` - An optional custom node ID. If not provided, a new UUID will be generated.
80    /// * `error_handler` - An optional error handler that will be called when an error occurs.
81    /// 
82    /// # Returns
83    ///
84    /// A new instance of `Delta`.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// let config = DeltaConfig::Master { ip: "127.0.0.1".to_string(), port: 8080, db_path: "/path/to/db".to_string(), tables: vec!["table1".to_string()] };
90    /// let delta = Delta::new(config, None, None).await;
91    /// ```
92    pub async fn new(
93        delta_config: DeltaConfig,
94        custom_node_id: Option<String>,
95        error_handler: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>
96    ) -> Delta {
97        // Generate UUID for the node_id or use custom node_id if provided
98        let node_id: String;
99        match custom_node_id {
100            Some(id) => node_id = id,
101            None => node_id = Uuid::new_v4().to_string(),
102        }
103
104        let (node_handler, node_receiver) = NodeHandler::new(error_handler);
105
106        let receiver_arc = Arc::new(Mutex::new(node_receiver));
107
108        let node_handle = node_handler
109            .start(receiver_arc, node_id.clone(), delta_config.clone())
110            .await;
111
112        let node_type = match delta_config {
113            DeltaConfig::Master {
114                ip: _,
115                port: _,
116                db_path: _,
117                tables: _,
118            } => NodeType::Master,
119            DeltaConfig::Auxiliary {
120                ip: _,
121                port: _,
122                db_path: _,
123                tables: _,
124                master_connection_params: _,
125            } => NodeType::Auxiliary,
126        };
127
128        Delta {
129            node_handler,
130            node_handle,
131            node_id,
132            node_type,
133        }
134    }
135
136    /// Shuts down the Delta node, disconnecting from all nodes and terminating running processes.
137    ///
138    /// # Returns
139    ///
140    /// A boolean indicating whether the shutdown was successful.
141    pub async fn shutdown(&self) -> Result<bool, Box<dyn Error>> {
142        let success = self.node_handler.shutdown().await?;
143        if success {
144            self.node_handle.abort();
145        }
146        Ok(success)
147    }
148
149    // TODO implement the adding and removing of tables
150    // Adds a table to the list of tracked tables
151    // fn add_table(table: String) {}
152
153    // TODO implement the adding and removing of tables
154    // Removes a table from the list of tracked tables (can put on the backlog for now)
155    // fn remove_table(table: String) {}
156
157    /// Returns the list of tables that the Delta node is currently tracking.
158    ///
159    /// # Returns
160    ///
161    /// A vector of strings representing the table names.
162    pub async fn get_tracked_tables(&self) -> Result<Vec<String>, Box<dyn Error>> {
163        let tracked_tables = self.node_handler.get_tracked_tables().await?;
164        Ok(tracked_tables)
165    }
166
167    /// Executes a write operation using the provided SQL string.
168    ///
169    /// # Arguments
170    ///
171    /// * `sql` - The SQL string to be executed.
172    ///
173    /// # Returns
174    ///
175    /// An integer representing the result of the write operation.
176    // For now, allow all SQL statements — in the future we could do checking to make sure that no unsupported things are used
177    pub async fn execute_write(&self, sql: String) -> Result<i32, Box<dyn Error>> {
178        let result = self.node_handler.execute_write(sql).await?;
179        Ok(result)
180    }
181
182    /// Executes a read operation using the provided SQL string.
183    ///
184    /// # Arguments
185    ///
186    /// * `sql` - The SQL string to be executed.
187    ///
188    /// # Returns
189    ///
190    /// A vector of vectors containing the results of the read operation.
191    pub async fn execute_read(&self, sql: String) -> Result<Vec<Vec<Value>>, Box<dyn Error>> {
192        let result = self.node_handler.execute_read(sql).await?;
193        Ok(result)
194    }
195
196    /// Returns the entire changelog of the Delta node.
197    ///
198    /// # Returns
199    ///
200    /// A vector of `Change` objects representing the changelog.
201    pub async fn get_changelog(&self) -> Result<Vec<Change>, Box<dyn Error>> {
202        let changelog = self.node_handler.get_changelog().await?;
203        Ok(changelog)
204    }
205
206    /// Connects to a node using the provided connection parameters.
207    ///
208    /// # Arguments
209    ///
210    /// * `connection_params` - The connection parameters for the node to connect to.
211    ///
212    /// # Returns
213    ///
214    /// A boolean indicating whether the connection was successful.
215    // TODO enable https connections
216    pub async fn connect(&self, connection_params: ConnectionParams) -> Result<bool, Box<dyn Error>> {
217        let success = self.node_handler.connect(connection_params).await?;
218        Ok(success)
219    }
220
221    /// Disconnects from the specified node.
222    ///
223    /// # Arguments
224    ///
225    /// * `node_id` - The ID of the node to disconnect from.
226    ///
227    /// # Returns
228    ///
229    /// A boolean indicating whether the disconnection was successful.
230    pub async fn disconnect(&self, node_id: String) -> Result<bool, Box<dyn Error>> {
231        let success = self.node_handler.disconnect(node_id).await?;
232        Ok(success)
233    }
234
235    /// Returns the list of connected nodes.
236    ///
237    /// # Returns
238    ///
239    /// A vector of strings representing the IDs of connected nodes.
240    pub async fn get_connected_devices(&self) -> Result<Vec<String>, Box<dyn Error>> {
241        let connected_nodes = self.node_handler.get_connected_nodes().await?;
242        Ok(connected_nodes)
243    }
244
245    /// Returns the ID of the Delta node.
246    ///
247    /// # Returns
248    ///
249    /// A string representing the node ID.
250    pub fn get_node_id(&self) -> String {
251        self.node_id.clone()
252    }
253
254    /// Registers an asynchronous callback function to be called on a database change.
255    ///
256    /// # Arguments
257    ///
258    /// * `callback` - The callback function to register.
259    ///
260    /// # Returns
261    ///
262    /// A boolean indicating whether the callback registration was successful.
263    pub async fn register_db_change_callback<F, Fut>(&self, callback: F) -> Result<bool, Box<dyn Error>>
264    where
265        F: Fn() -> Fut + Send + Sync + 'static,
266        Fut: Future<Output = ()> + Send + 'static,
267    {
268        let cb = Arc::new(move || {
269            let fut = callback();
270            Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + Send>>
271        });
272
273        let success = self.node_handler.register_db_change_callback(cb).await?;
274
275        Ok(success)
276    }
277}