Skip to main content

cooklang_sync_client/
lib.rs

1use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
2use futures::{channel::mpsc::channel, try_join};
3use notify::RecursiveMode;
4use serde::Deserialize;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20
21pub mod chunker;
22pub mod connection;
23pub mod context;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33// Export SyncStatus and context types for external use
34pub use context::{SyncContext, SyncStatusListener};
35pub use models::SyncStatus;
36
37/// Extracts the user ID from a JWT token without signature verification.
38/// JWT format: header.payload.signature (base64url encoded)
39pub fn extract_uid_from_jwt(token: &str) -> i32 {
40    #[derive(Deserialize)]
41    struct Claims {
42        uid: i32,
43    }
44
45    let parts: Vec<&str> = token.split('.').collect();
46    let payload = URL_SAFE_NO_PAD
47        .decode(parts[1])
48        .expect("Failed to decode JWT payload");
49    let claims: Claims = serde_json::from_slice(&payload).expect("Failed to parse JWT claims");
50
51    claims.uid
52}
53
54#[cfg(feature = "ffi")]
55uniffi::setup_scaffolding!();
56
57/// Synchronous alias to async run function.
58/// Intended to used by external (written in other languages) callers.
59#[cfg_attr(feature = "ffi", uniffi::export)]
60pub fn run(
61    context: Arc<SyncContext>,
62    storage_dir: &str,
63    db_file_path: &str,
64    api_endpoint: &str,
65    remote_token: &str,
66    namespace_id: i32,
67    download_only: bool,
68) -> Result<(), errors::SyncError> {
69    Runtime::new()?.block_on(run_async(
70        context,
71        storage_dir,
72        db_file_path,
73        api_endpoint,
74        remote_token,
75        namespace_id,
76        download_only,
77    ))?;
78
79    Ok(())
80}
81
82/// Connects to the server and waits either when `wait_time` expires or
83/// when there's a remote update for this client.
84/// Note, it doesn't do the update itself, you need to use `run_download_once`
85/// after this function completes.
86#[cfg_attr(feature = "ffi", uniffi::export)]
87pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
88    Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
89
90    Ok(())
91}
92
93/// Runs one-off download of updates from remote server.
94/// Note, it's not very efficient as requires to re-initialize DB connection,
95/// chunker, remote client, etc every time it runs.
96#[cfg_attr(feature = "ffi", uniffi::export)]
97pub fn run_download_once(
98    storage_dir: &str,
99    db_file_path: &str,
100    api_endpoint: &str,
101    remote_token: &str,
102    namespace_id: i32,
103) -> Result<(), errors::SyncError> {
104    use std::env;
105
106    env::set_var("CARGO_LOG", "trace");
107
108    let storage_dir = &PathBuf::from(storage_dir);
109    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
110    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
111    let chunker = Arc::new(Mutex::new(chunker));
112    let remote = &remote::Remote::new(api_endpoint, remote_token);
113
114    let pool = connection::get_connection_pool(db_file_path)?;
115    debug!("Started connection pool for {:?}", db_file_path);
116
117    Runtime::new()?.block_on(check_download_once(
118        &pool,
119        Arc::clone(&chunker),
120        remote,
121        storage_dir,
122        namespace_id,
123    ))?;
124
125    Ok(())
126}
127
128/// Runs one-off upload of updates to remote server.
129/// Note, it's not very efficient as requires to re-initialize DB connection,
130/// chunker, remote client, etc every time it runs.
131#[cfg_attr(feature = "ffi", uniffi::export)]
132pub fn run_upload_once(
133    storage_dir: &str,
134    db_file_path: &str,
135    api_endpoint: &str,
136    remote_token: &str,
137    namespace_id: i32,
138) -> Result<(), errors::SyncError> {
139    let storage_dir = &PathBuf::from(storage_dir);
140    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
141    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
142    let chunker = Arc::new(Mutex::new(chunker));
143    let remote = &remote::Remote::new(api_endpoint, remote_token);
144
145    let pool = connection::get_connection_pool(db_file_path)?;
146    debug!("Started connection pool for {:?}", db_file_path);
147
148    check_index_once(&pool, storage_dir, namespace_id)?;
149
150    let runtime = Runtime::new()?;
151
152    // It requires first pass to upload missing chunks and second to
153    // commit and update `jid` to local records.
154    if !runtime.block_on(check_upload_once(
155        &pool,
156        Arc::clone(&chunker),
157        remote,
158        namespace_id,
159    ))? {
160        runtime.block_on(check_upload_once(
161            &pool,
162            Arc::clone(&chunker),
163            remote,
164            namespace_id,
165        ))?;
166    }
167
168    Ok(())
169}
170
171/// Runs local files watch and sync from/to remote continuously.
172#[allow(clippy::too_many_arguments)]
173pub async fn run_async(
174    context: Arc<SyncContext>,
175    storage_dir: &str,
176    db_file_path: &str,
177    api_endpoint: &str,
178    remote_token: &str,
179    namespace_id: i32,
180    download_only: bool,
181) -> Result<(), errors::SyncError> {
182    let token = context.token();
183    let listener = context.listener();
184
185    // Initialize all components first
186    let (mut debouncer, local_file_update_rx) = async_watcher()?;
187    let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
188
189    let storage_dir = &PathBuf::from(storage_dir);
190    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
191    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
192    let remote = &remote::Remote::new(api_endpoint, remote_token);
193
194    let pool = connection::get_connection_pool(db_file_path)?;
195    debug!("Started connection pool for {:?}", db_file_path);
196
197    if !download_only {
198        debouncer
199            .watcher()
200            .watch(storage_dir, RecursiveMode::Recursive)?;
201        debug!("Started watcher on {:?}", storage_dir);
202    }
203
204    // Notify syncing status after successful initialization
205    if let Some(ref cb) = listener {
206        cb.on_status_changed(SyncStatus::Syncing);
207    }
208
209    let indexer = indexer::run(
210        token.clone(),
211        listener.clone(),
212        &pool,
213        storage_dir,
214        namespace_id,
215        local_file_update_rx,
216        local_registry_updated_tx,
217    );
218    debug!("Started indexer on {:?}", storage_dir);
219
220    let syncer = syncer::run(
221        token.clone(),
222        listener.clone(),
223        &pool,
224        storage_dir,
225        namespace_id,
226        chunker,
227        remote,
228        local_registry_updated_rx,
229        download_only,
230    );
231    debug!("Started syncer");
232
233    let result = try_join!(indexer, syncer);
234
235    // Notify completion (on_complete includes success status and optional error message)
236    if let Some(ref cb) = listener {
237        match result {
238            Ok(_) => cb.on_complete(true, None),
239            Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
240        }
241    }
242
243    result?;
244    Ok(())
245}