cooklang_sync_client/
lib.rs

1use futures::{channel::mpsc::channel, try_join};
2use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
3use notify::RecursiveMode;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9use tokio_util::sync::CancellationToken;
10
11use log::debug;
12
13use crate::chunker::{Chunker, InMemoryCache};
14use crate::file_watcher::async_watcher;
15use crate::indexer::check_index_once;
16use crate::syncer::{check_download_once, check_upload_once};
17
18const CHANNEL_SIZE: usize = 100;
19const INMEMORY_CACHE_MAX_REC: usize = 100000;
20const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
21const DUMMY_SECRET: &[u8] = b"dummy_secret";
22
23pub mod chunker;
24pub mod connection;
25pub mod context;
26pub mod errors;
27pub mod file_watcher;
28pub mod indexer;
29pub mod models;
30pub mod registry;
31pub mod remote;
32pub mod schema;
33pub mod syncer;
34
35// Export SyncStatus and context types for external use
36pub use context::{SyncContext, SyncStatusListener};
37pub use models::SyncStatus;
38
39pub fn extract_uid_from_jwt(token: &str) -> i32 {
40    let mut validation = Validation::new(Algorithm::HS256);
41
42    // Disabling signature validation because we don't know real secret
43    validation.insecure_disable_signature_validation();
44
45    #[derive(Debug, Clone, Serialize, Deserialize)]
46    struct Claims {
47        uid: i32,
48    }
49
50    let token_data: TokenData<Claims> =
51        decode::<Claims>(token, &DecodingKey::from_secret(DUMMY_SECRET), &validation)
52            .expect("Failed to decode token");
53
54    token_data.claims.uid
55}
56
57uniffi::setup_scaffolding!();
58
59/// Synchronous alias to async run function.
60/// Intended to used by external (written in other languages) callers.
61#[uniffi::export]
62pub fn run(
63    context: Arc<SyncContext>,
64    storage_dir: &str,
65    db_file_path: &str,
66    api_endpoint: &str,
67    remote_token: &str,
68    namespace_id: i32,
69    download_only: bool,
70) -> Result<(), errors::SyncError> {
71    let token = context.token();
72    let listener = context.listener();
73
74    Runtime::new()?.block_on(run_async(
75        token,
76        listener,
77        storage_dir,
78        db_file_path,
79        api_endpoint,
80        remote_token,
81        namespace_id,
82        download_only,
83    ))?;
84
85    Ok(())
86}
87
88/// Connects to the server and waits either when `wait_time` expires or
89/// when there's a remote update for this client.
90/// Note, it doesn't do the update itself, you need to use `run_download_once`
91/// after this function completes.
92#[uniffi::export]
93pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
94    Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
95
96    Ok(())
97}
98
99/// Runs one-off download of updates from remote server.
100/// Note, it's not very efficient as requires to re-initialize DB connection,
101/// chunker, remote client, etc every time it runs.
102#[uniffi::export]
103pub fn run_download_once(
104    storage_dir: &str,
105    db_file_path: &str,
106    api_endpoint: &str,
107    remote_token: &str,
108    namespace_id: i32,
109) -> Result<(), errors::SyncError> {
110    use std::env;
111
112    env::set_var("CARGO_LOG", "trace");
113
114    let storage_dir = &PathBuf::from(storage_dir);
115    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
116    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
117    let chunker = Arc::new(Mutex::new(chunker));
118    let remote = &remote::Remote::new(api_endpoint, remote_token);
119
120    let pool = connection::get_connection_pool(db_file_path)?;
121    debug!("Started connection pool for {:?}", db_file_path);
122
123    Runtime::new()?.block_on(check_download_once(
124        &pool,
125        Arc::clone(&chunker),
126        remote,
127        storage_dir,
128        namespace_id,
129    ))?;
130
131    Ok(())
132}
133
134/// Runs one-off upload of updates to remote server.
135/// Note, it's not very efficient as requires to re-initialize DB connection,
136/// chunker, remote client, etc every time it runs.
137#[uniffi::export]
138pub fn run_upload_once(
139    storage_dir: &str,
140    db_file_path: &str,
141    api_endpoint: &str,
142    remote_token: &str,
143    namespace_id: i32,
144) -> Result<(), errors::SyncError> {
145    let storage_dir = &PathBuf::from(storage_dir);
146    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
147    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
148    let chunker = Arc::new(Mutex::new(chunker));
149    let remote = &remote::Remote::new(api_endpoint, remote_token);
150
151    let pool = connection::get_connection_pool(db_file_path)?;
152    debug!("Started connection pool for {:?}", db_file_path);
153
154    check_index_once(&pool, storage_dir, namespace_id)?;
155
156    let runtime = Runtime::new()?;
157
158    // It requires first pass to upload missing chunks and second to
159    // commit and update `jid` to local records.
160    if !runtime.block_on(check_upload_once(
161        &pool,
162        Arc::clone(&chunker),
163        remote,
164        namespace_id,
165    ))? {
166        runtime.block_on(check_upload_once(
167            &pool,
168            Arc::clone(&chunker),
169            remote,
170            namespace_id,
171        ))?;
172    }
173
174    Ok(())
175}
176
177/// Runs local files watch and sync from/to remote continuously.
178#[allow(clippy::too_many_arguments)]
179pub async fn run_async(
180    token: CancellationToken,
181    listener: Option<Arc<dyn SyncStatusListener>>,
182    storage_dir: &str,
183    db_file_path: &str,
184    api_endpoint: &str,
185    remote_token: &str,
186    namespace_id: i32,
187    download_only: bool,
188) -> Result<(), errors::SyncError> {
189    // Initialize all components first
190    let (mut debouncer, local_file_update_rx) = async_watcher()?;
191    let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
192
193    let storage_dir = &PathBuf::from(storage_dir);
194    let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
195    let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
196    let remote = &remote::Remote::new(api_endpoint, remote_token);
197
198    let pool = connection::get_connection_pool(db_file_path)?;
199    debug!("Started connection pool for {:?}", db_file_path);
200
201    if !download_only {
202        debouncer
203            .watcher()
204            .watch(storage_dir, RecursiveMode::Recursive)?;
205        debug!("Started watcher on {:?}", storage_dir);
206    }
207
208    // Notify syncing status after successful initialization
209    if let Some(ref cb) = listener {
210        cb.on_status_changed(SyncStatus::Syncing);
211    }
212
213    let indexer = indexer::run(
214        token.clone(),
215        listener.clone(),
216        &pool,
217        storage_dir,
218        namespace_id,
219        local_file_update_rx,
220        local_registry_updated_tx,
221    );
222    debug!("Started indexer on {:?}", storage_dir);
223
224    let syncer = syncer::run(
225        token.clone(),
226        listener.clone(),
227        &pool,
228        storage_dir,
229        namespace_id,
230        chunker,
231        remote,
232        local_registry_updated_rx,
233        download_only,
234    );
235    debug!("Started syncer");
236
237    let result = try_join!(indexer, syncer);
238
239    // Notify completion (on_complete includes success status and optional error message)
240    if let Some(ref cb) = listener {
241        match result {
242            Ok(_) => cb.on_complete(true, None),
243            Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
244        }
245    }
246
247    result?;
248    Ok(())
249}