Skip to main content

cooklang_sync_client/
lib.rs

1use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
2use futures::{channel::mpsc::channel, try_join};
3use notify::RecursiveMode;
4use serde::Deserialize;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20
21pub mod chunker;
22pub mod connection;
23pub mod context;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33// Export SyncStatus and context types for external use
34pub use context::{SyncContext, SyncStatusListener};
35pub use models::SyncStatus;
36
37/// Extracts the user ID from a JWT token without signature verification.
38/// JWT format: header.payload.signature (base64url encoded)
39pub fn extract_uid_from_jwt(token: &str) -> i32 {
40    #[derive(Deserialize)]
41    struct Claims {
42        uid: i32,
43    }
44
45    let parts: Vec<&str> = token.split('.').collect();
46    let payload = URL_SAFE_NO_PAD
47        .decode(parts[1])
48        .expect("Failed to decode JWT payload");
49    let claims: Claims =
50        serde_json::from_slice(&payload).expect("Failed to parse JWT claims");
51
52    claims.uid
53}
54
55#[cfg(feature = "ffi")]
56uniffi::setup_scaffolding!();
57
58/// Synchronous alias to async run function.
59/// Intended to used by external (written in other languages) callers.
60#[cfg_attr(feature = "ffi", uniffi::export)]
61pub fn run(
62    context: Arc<SyncContext>,
63    storage_dir: &str,
64    db_file_path: &str,
65    api_endpoint: &str,
66    remote_token: &str,
67    namespace_id: i32,
68    download_only: bool,
69) -> Result<(), errors::SyncError> {
70    Runtime::new()?.block_on(run_async(
71        context,
72        storage_dir,
73        db_file_path,
74        api_endpoint,
75        remote_token,
76        namespace_id,
77        download_only,
78    ))?;
79
80    Ok(())
81}
82
83/// Connects to the server and waits either when `wait_time` expires or
84/// when there's a remote update for this client.
85/// Note, it doesn't do the update itself, you need to use `run_download_once`
86/// after this function completes.
87#[cfg_attr(feature = "ffi", uniffi::export)]
88pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
89    Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
90
91    Ok(())
92}
93
94/// Runs one-off download of updates from remote server.
95/// Note, it's not very efficient as requires to re-initialize DB connection,
96/// chunker, remote client, etc every time it runs.
97#[cfg_attr(feature = "ffi", uniffi::export)]
98pub fn run_download_once(
99    storage_dir: &str,
100    db_file_path: &str,
101    api_endpoint: &str,
102    remote_token: &str,
103    namespace_id: i32,
104) -> Result<(), errors::SyncError> {
105    use std::env;
106
107    env::set_var("CARGO_LOG", "trace");
108
109    let storage_dir = &PathBuf::from(storage_dir);
110    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
111    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
112    let chunker = Arc::new(Mutex::new(chunker));
113    let remote = &remote::Remote::new(api_endpoint, remote_token);
114
115    let pool = connection::get_connection_pool(db_file_path)?;
116    debug!("Started connection pool for {:?}", db_file_path);
117
118    Runtime::new()?.block_on(check_download_once(
119        &pool,
120        Arc::clone(&chunker),
121        remote,
122        storage_dir,
123        namespace_id,
124    ))?;
125
126    Ok(())
127}
128
129/// Runs one-off upload of updates to remote server.
130/// Note, it's not very efficient as requires to re-initialize DB connection,
131/// chunker, remote client, etc every time it runs.
132#[cfg_attr(feature = "ffi", uniffi::export)]
133pub fn run_upload_once(
134    storage_dir: &str,
135    db_file_path: &str,
136    api_endpoint: &str,
137    remote_token: &str,
138    namespace_id: i32,
139) -> Result<(), errors::SyncError> {
140    let storage_dir = &PathBuf::from(storage_dir);
141    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
142    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
143    let chunker = Arc::new(Mutex::new(chunker));
144    let remote = &remote::Remote::new(api_endpoint, remote_token);
145
146    let pool = connection::get_connection_pool(db_file_path)?;
147    debug!("Started connection pool for {:?}", db_file_path);
148
149    check_index_once(&pool, storage_dir, namespace_id)?;
150
151    let runtime = Runtime::new()?;
152
153    // It requires first pass to upload missing chunks and second to
154    // commit and update `jid` to local records.
155    if !runtime.block_on(check_upload_once(
156        &pool,
157        Arc::clone(&chunker),
158        remote,
159        namespace_id,
160    ))? {
161        runtime.block_on(check_upload_once(
162            &pool,
163            Arc::clone(&chunker),
164            remote,
165            namespace_id,
166        ))?;
167    }
168
169    Ok(())
170}
171
172/// Runs local files watch and sync from/to remote continuously.
173#[allow(clippy::too_many_arguments)]
174pub async fn run_async(
175    context: Arc<SyncContext>,
176    storage_dir: &str,
177    db_file_path: &str,
178    api_endpoint: &str,
179    remote_token: &str,
180    namespace_id: i32,
181    download_only: bool,
182) -> Result<(), errors::SyncError> {
183    let token = context.token();
184    let listener = context.listener();
185
186    // Initialize all components first
187    let (mut debouncer, local_file_update_rx) = async_watcher()?;
188    let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
189
190    let storage_dir = &PathBuf::from(storage_dir);
191    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
192    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
193    let remote = &remote::Remote::new(api_endpoint, remote_token);
194
195    let pool = connection::get_connection_pool(db_file_path)?;
196    debug!("Started connection pool for {:?}", db_file_path);
197
198    if !download_only {
199        debouncer
200            .watcher()
201            .watch(storage_dir, RecursiveMode::Recursive)?;
202        debug!("Started watcher on {:?}", storage_dir);
203    }
204
205    // Notify syncing status after successful initialization
206    if let Some(ref cb) = listener {
207        cb.on_status_changed(SyncStatus::Syncing);
208    }
209
210    let indexer = indexer::run(
211        token.clone(),
212        listener.clone(),
213        &pool,
214        storage_dir,
215        namespace_id,
216        local_file_update_rx,
217        local_registry_updated_tx,
218    );
219    debug!("Started indexer on {:?}", storage_dir);
220
221    let syncer = syncer::run(
222        token.clone(),
223        listener.clone(),
224        &pool,
225        storage_dir,
226        namespace_id,
227        chunker,
228        remote,
229        local_registry_updated_rx,
230        download_only,
231    );
232    debug!("Started syncer");
233
234    let result = try_join!(indexer, syncer);
235
236    // Notify completion (on_complete includes success status and optional error message)
237    if let Some(ref cb) = listener {
238        match result {
239            Ok(_) => cb.on_complete(true, None),
240            Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
241        }
242    }
243
244    result?;
245    Ok(())
246}