1use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
2use futures::{channel::mpsc::channel, try_join};
3use notify::RecursiveMode;
4use serde::Deserialize;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9
10use log::debug;
11
12use crate::chunker::{Chunker, InMemoryCache};
13use crate::file_watcher::async_watcher;
14use crate::indexer::check_index_once;
15use crate::syncer::{check_download_once, check_upload_once};
16
17const CHANNEL_SIZE: usize = 100;
18const INMEMORY_CACHE_MAX_REC: usize = 100000;
19const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
20
21pub mod chunker;
22pub mod connection;
23pub mod context;
24pub mod errors;
25pub mod file_watcher;
26pub mod indexer;
27pub mod models;
28pub mod registry;
29pub mod remote;
30pub mod schema;
31pub mod syncer;
32
33pub use context::{SyncContext, SyncStatusListener};
35pub use models::SyncStatus;
36
37pub fn extract_uid_from_jwt(token: &str) -> i32 {
40 #[derive(Deserialize)]
41 struct Claims {
42 uid: i32,
43 }
44
45 let parts: Vec<&str> = token.split('.').collect();
46 let payload = URL_SAFE_NO_PAD
47 .decode(parts[1])
48 .expect("Failed to decode JWT payload");
49 let claims: Claims = serde_json::from_slice(&payload).expect("Failed to parse JWT claims");
50
51 claims.uid
52}
53
54#[cfg(feature = "ffi")]
55uniffi::setup_scaffolding!();
56
57#[cfg_attr(feature = "ffi", uniffi::export)]
60pub fn run(
61 context: Arc<SyncContext>,
62 storage_dir: &str,
63 db_file_path: &str,
64 api_endpoint: &str,
65 remote_token: &str,
66 namespace_id: i32,
67 download_only: bool,
68) -> Result<(), errors::SyncError> {
69 Runtime::new()?.block_on(run_async(
70 context,
71 storage_dir,
72 db_file_path,
73 api_endpoint,
74 remote_token,
75 namespace_id,
76 download_only,
77 ))?;
78
79 Ok(())
80}
81
82#[cfg_attr(feature = "ffi", uniffi::export)]
87pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
88 Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
89
90 Ok(())
91}
92
93#[cfg_attr(feature = "ffi", uniffi::export)]
97pub fn run_download_once(
98 storage_dir: &str,
99 db_file_path: &str,
100 api_endpoint: &str,
101 remote_token: &str,
102 namespace_id: i32,
103) -> Result<(), errors::SyncError> {
104 use std::env;
105
106 env::set_var("CARGO_LOG", "trace");
107
108 let storage_dir = &PathBuf::from(storage_dir);
109 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
110 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
111 let chunker = Arc::new(Mutex::new(chunker));
112 let remote = &remote::Remote::new(api_endpoint, remote_token);
113
114 let pool = connection::get_connection_pool(db_file_path)?;
115 debug!("Started connection pool for {:?}", db_file_path);
116
117 Runtime::new()?.block_on(check_download_once(
118 &pool,
119 Arc::clone(&chunker),
120 remote,
121 storage_dir,
122 namespace_id,
123 ))?;
124
125 Ok(())
126}
127
128#[cfg_attr(feature = "ffi", uniffi::export)]
132pub fn run_upload_once(
133 storage_dir: &str,
134 db_file_path: &str,
135 api_endpoint: &str,
136 remote_token: &str,
137 namespace_id: i32,
138) -> Result<(), errors::SyncError> {
139 let storage_dir = &PathBuf::from(storage_dir);
140 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
141 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
142 let chunker = Arc::new(Mutex::new(chunker));
143 let remote = &remote::Remote::new(api_endpoint, remote_token);
144
145 let pool = connection::get_connection_pool(db_file_path)?;
146 debug!("Started connection pool for {:?}", db_file_path);
147
148 check_index_once(&pool, storage_dir, namespace_id)?;
149
150 let runtime = Runtime::new()?;
151
152 if !runtime.block_on(check_upload_once(
155 &pool,
156 Arc::clone(&chunker),
157 remote,
158 namespace_id,
159 ))? {
160 runtime.block_on(check_upload_once(
161 &pool,
162 Arc::clone(&chunker),
163 remote,
164 namespace_id,
165 ))?;
166 }
167
168 Ok(())
169}
170
171#[allow(clippy::too_many_arguments)]
173pub async fn run_async(
174 context: Arc<SyncContext>,
175 storage_dir: &str,
176 db_file_path: &str,
177 api_endpoint: &str,
178 remote_token: &str,
179 namespace_id: i32,
180 download_only: bool,
181) -> Result<(), errors::SyncError> {
182 let token = context.token();
183 let listener = context.listener();
184
185 let (mut debouncer, local_file_update_rx) = async_watcher()?;
187 let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
188
189 let storage_dir = &PathBuf::from(storage_dir);
190 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
191 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
192 let remote = &remote::Remote::new(api_endpoint, remote_token);
193
194 let pool = connection::get_connection_pool(db_file_path)?;
195 debug!("Started connection pool for {:?}", db_file_path);
196
197 if !download_only {
198 debouncer
199 .watcher()
200 .watch(storage_dir, RecursiveMode::Recursive)?;
201 debug!("Started watcher on {:?}", storage_dir);
202 }
203
204 if let Some(ref cb) = listener {
206 cb.on_status_changed(SyncStatus::Syncing);
207 }
208
209 let indexer = indexer::run(
210 token.clone(),
211 listener.clone(),
212 &pool,
213 storage_dir,
214 namespace_id,
215 local_file_update_rx,
216 local_registry_updated_tx,
217 );
218 debug!("Started indexer on {:?}", storage_dir);
219
220 let syncer = syncer::run(
221 token.clone(),
222 listener.clone(),
223 &pool,
224 storage_dir,
225 namespace_id,
226 chunker,
227 remote,
228 local_registry_updated_rx,
229 download_only,
230 );
231 debug!("Started syncer");
232
233 let result = try_join!(indexer, syncer);
234
235 if let Some(ref cb) = listener {
237 match result {
238 Ok(_) => cb.on_complete(true, None),
239 Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
240 }
241 }
242
243 result?;
244 Ok(())
245}