1use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
2use futures::{channel::mpsc::channel, try_join};
3use notify::RecursiveMode;
4use serde::Deserialize;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9use tokio_util::sync::CancellationToken;
10
11use log::debug;
12
13use crate::chunker::{Chunker, InMemoryCache};
14use crate::file_watcher::async_watcher;
15use crate::indexer::check_index_once;
16use crate::syncer::{check_download_once, check_upload_once};
17
18const CHANNEL_SIZE: usize = 100;
19const INMEMORY_CACHE_MAX_REC: usize = 100000;
20const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
21
22pub mod chunker;
23pub mod connection;
24pub mod context;
25pub mod errors;
26pub mod file_watcher;
27pub mod indexer;
28pub mod models;
29pub mod registry;
30pub mod remote;
31pub mod schema;
32pub mod syncer;
33
34pub use context::{SyncContext, SyncStatusListener};
36pub use models::SyncStatus;
37
38pub fn extract_uid_from_jwt(token: &str) -> i32 {
41 #[derive(Deserialize)]
42 struct Claims {
43 uid: i32,
44 }
45
46 let parts: Vec<&str> = token.split('.').collect();
47 let payload = URL_SAFE_NO_PAD
48 .decode(parts[1])
49 .expect("Failed to decode JWT payload");
50 let claims: Claims =
51 serde_json::from_slice(&payload).expect("Failed to parse JWT claims");
52
53 claims.uid
54}
55
56#[cfg(feature = "ffi")]
57uniffi::setup_scaffolding!();
58
59#[cfg_attr(feature = "ffi", uniffi::export)]
62pub fn run(
63 context: Arc<SyncContext>,
64 storage_dir: &str,
65 db_file_path: &str,
66 api_endpoint: &str,
67 remote_token: &str,
68 namespace_id: i32,
69 download_only: bool,
70) -> Result<(), errors::SyncError> {
71 let token = context.token();
72 let listener = context.listener();
73
74 Runtime::new()?.block_on(run_async(
75 token,
76 listener,
77 storage_dir,
78 db_file_path,
79 api_endpoint,
80 remote_token,
81 namespace_id,
82 download_only,
83 ))?;
84
85 Ok(())
86}
87
88#[cfg_attr(feature = "ffi", uniffi::export)]
93pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
94 Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
95
96 Ok(())
97}
98
99#[cfg_attr(feature = "ffi", uniffi::export)]
103pub fn run_download_once(
104 storage_dir: &str,
105 db_file_path: &str,
106 api_endpoint: &str,
107 remote_token: &str,
108 namespace_id: i32,
109) -> Result<(), errors::SyncError> {
110 use std::env;
111
112 env::set_var("CARGO_LOG", "trace");
113
114 let storage_dir = &PathBuf::from(storage_dir);
115 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
116 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
117 let chunker = Arc::new(Mutex::new(chunker));
118 let remote = &remote::Remote::new(api_endpoint, remote_token);
119
120 let pool = connection::get_connection_pool(db_file_path)?;
121 debug!("Started connection pool for {:?}", db_file_path);
122
123 Runtime::new()?.block_on(check_download_once(
124 &pool,
125 Arc::clone(&chunker),
126 remote,
127 storage_dir,
128 namespace_id,
129 ))?;
130
131 Ok(())
132}
133
134#[cfg_attr(feature = "ffi", uniffi::export)]
138pub fn run_upload_once(
139 storage_dir: &str,
140 db_file_path: &str,
141 api_endpoint: &str,
142 remote_token: &str,
143 namespace_id: i32,
144) -> Result<(), errors::SyncError> {
145 let storage_dir = &PathBuf::from(storage_dir);
146 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
147 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
148 let chunker = Arc::new(Mutex::new(chunker));
149 let remote = &remote::Remote::new(api_endpoint, remote_token);
150
151 let pool = connection::get_connection_pool(db_file_path)?;
152 debug!("Started connection pool for {:?}", db_file_path);
153
154 check_index_once(&pool, storage_dir, namespace_id)?;
155
156 let runtime = Runtime::new()?;
157
158 if !runtime.block_on(check_upload_once(
161 &pool,
162 Arc::clone(&chunker),
163 remote,
164 namespace_id,
165 ))? {
166 runtime.block_on(check_upload_once(
167 &pool,
168 Arc::clone(&chunker),
169 remote,
170 namespace_id,
171 ))?;
172 }
173
174 Ok(())
175}
176
177#[allow(clippy::too_many_arguments)]
179pub async fn run_async(
180 token: CancellationToken,
181 listener: Option<Arc<dyn SyncStatusListener>>,
182 storage_dir: &str,
183 db_file_path: &str,
184 api_endpoint: &str,
185 remote_token: &str,
186 namespace_id: i32,
187 download_only: bool,
188) -> Result<(), errors::SyncError> {
189 let (mut debouncer, local_file_update_rx) = async_watcher()?;
191 let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
192
193 let storage_dir = &PathBuf::from(storage_dir);
194 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
195 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
196 let remote = &remote::Remote::new(api_endpoint, remote_token);
197
198 let pool = connection::get_connection_pool(db_file_path)?;
199 debug!("Started connection pool for {:?}", db_file_path);
200
201 if !download_only {
202 debouncer
203 .watcher()
204 .watch(storage_dir, RecursiveMode::Recursive)?;
205 debug!("Started watcher on {:?}", storage_dir);
206 }
207
208 if let Some(ref cb) = listener {
210 cb.on_status_changed(SyncStatus::Syncing);
211 }
212
213 let indexer = indexer::run(
214 token.clone(),
215 listener.clone(),
216 &pool,
217 storage_dir,
218 namespace_id,
219 local_file_update_rx,
220 local_registry_updated_tx,
221 );
222 debug!("Started indexer on {:?}", storage_dir);
223
224 let syncer = syncer::run(
225 token.clone(),
226 listener.clone(),
227 &pool,
228 storage_dir,
229 namespace_id,
230 chunker,
231 remote,
232 local_registry_updated_rx,
233 download_only,
234 );
235 debug!("Started syncer");
236
237 let result = try_join!(indexer, syncer);
238
239 if let Some(ref cb) = listener {
241 match result {
242 Ok(_) => cb.on_complete(true, None),
243 Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
244 }
245 }
246
247 result?;
248 Ok(())
249}