1use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
2use futures::{channel::mpsc::channel, try_join};
3use notify::RecursiveMode;
4use serde::Deserialize;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20
21pub mod chunker;
22pub mod connection;
23pub mod context;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33pub use context::{SyncContext, SyncStatusListener};
35pub use models::SyncStatus;
36
37pub fn extract_uid_from_jwt(token: &str) -> i32 {
40 #[derive(Deserialize)]
41 struct Claims {
42 uid: i32,
43 }
44
45 let parts: Vec<&str> = token.split('.').collect();
46 let payload = URL_SAFE_NO_PAD
47 .decode(parts[1])
48 .expect("Failed to decode JWT payload");
49 let claims: Claims =
50 serde_json::from_slice(&payload).expect("Failed to parse JWT claims");
51
52 claims.uid
53}
54
55#[cfg(feature = "ffi")]
56uniffi::setup_scaffolding!();
57
58#[cfg_attr(feature = "ffi", uniffi::export)]
61pub fn run(
62 context: Arc<SyncContext>,
63 storage_dir: &str,
64 db_file_path: &str,
65 api_endpoint: &str,
66 remote_token: &str,
67 namespace_id: i32,
68 download_only: bool,
69) -> Result<(), errors::SyncError> {
70 Runtime::new()?.block_on(run_async(
71 context,
72 storage_dir,
73 db_file_path,
74 api_endpoint,
75 remote_token,
76 namespace_id,
77 download_only,
78 ))?;
79
80 Ok(())
81}
82
83#[cfg_attr(feature = "ffi", uniffi::export)]
88pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
89 Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
90
91 Ok(())
92}
93
94#[cfg_attr(feature = "ffi", uniffi::export)]
98pub fn run_download_once(
99 storage_dir: &str,
100 db_file_path: &str,
101 api_endpoint: &str,
102 remote_token: &str,
103 namespace_id: i32,
104) -> Result<(), errors::SyncError> {
105 use std::env;
106
107 env::set_var("CARGO_LOG", "trace");
108
109 let storage_dir = &PathBuf::from(storage_dir);
110 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
111 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
112 let chunker = Arc::new(Mutex::new(chunker));
113 let remote = &remote::Remote::new(api_endpoint, remote_token);
114
115 let pool = connection::get_connection_pool(db_file_path)?;
116 debug!("Started connection pool for {:?}", db_file_path);
117
118 Runtime::new()?.block_on(check_download_once(
119 &pool,
120 Arc::clone(&chunker),
121 remote,
122 storage_dir,
123 namespace_id,
124 ))?;
125
126 Ok(())
127}
128
129#[cfg_attr(feature = "ffi", uniffi::export)]
133pub fn run_upload_once(
134 storage_dir: &str,
135 db_file_path: &str,
136 api_endpoint: &str,
137 remote_token: &str,
138 namespace_id: i32,
139) -> Result<(), errors::SyncError> {
140 let storage_dir = &PathBuf::from(storage_dir);
141 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
142 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
143 let chunker = Arc::new(Mutex::new(chunker));
144 let remote = &remote::Remote::new(api_endpoint, remote_token);
145
146 let pool = connection::get_connection_pool(db_file_path)?;
147 debug!("Started connection pool for {:?}", db_file_path);
148
149 check_index_once(&pool, storage_dir, namespace_id)?;
150
151 let runtime = Runtime::new()?;
152
153 if !runtime.block_on(check_upload_once(
156 &pool,
157 Arc::clone(&chunker),
158 remote,
159 namespace_id,
160 ))? {
161 runtime.block_on(check_upload_once(
162 &pool,
163 Arc::clone(&chunker),
164 remote,
165 namespace_id,
166 ))?;
167 }
168
169 Ok(())
170}
171
172#[allow(clippy::too_many_arguments)]
174pub async fn run_async(
175 context: Arc<SyncContext>,
176 storage_dir: &str,
177 db_file_path: &str,
178 api_endpoint: &str,
179 remote_token: &str,
180 namespace_id: i32,
181 download_only: bool,
182) -> Result<(), errors::SyncError> {
183 let token = context.token();
184 let listener = context.listener();
185
186 let (mut debouncer, local_file_update_rx) = async_watcher()?;
188 let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
189
190 let storage_dir = &PathBuf::from(storage_dir);
191 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
192 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
193 let remote = &remote::Remote::new(api_endpoint, remote_token);
194
195 let pool = connection::get_connection_pool(db_file_path)?;
196 debug!("Started connection pool for {:?}", db_file_path);
197
198 if !download_only {
199 debouncer
200 .watcher()
201 .watch(storage_dir, RecursiveMode::Recursive)?;
202 debug!("Started watcher on {:?}", storage_dir);
203 }
204
205 if let Some(ref cb) = listener {
207 cb.on_status_changed(SyncStatus::Syncing);
208 }
209
210 let indexer = indexer::run(
211 token.clone(),
212 listener.clone(),
213 &pool,
214 storage_dir,
215 namespace_id,
216 local_file_update_rx,
217 local_registry_updated_tx,
218 );
219 debug!("Started indexer on {:?}", storage_dir);
220
221 let syncer = syncer::run(
222 token.clone(),
223 listener.clone(),
224 &pool,
225 storage_dir,
226 namespace_id,
227 chunker,
228 remote,
229 local_registry_updated_rx,
230 download_only,
231 );
232 debug!("Started syncer");
233
234 let result = try_join!(indexer, syncer);
235
236 if let Some(ref cb) = listener {
238 match result {
239 Ok(_) => cb.on_complete(true, None),
240 Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
241 }
242 }
243
244 result?;
245 Ok(())
246}