1use futures::{channel::mpsc::channel, try_join};
2use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
3use notify::RecursiveMode;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::runtime::Runtime;
8use tokio::sync::Mutex;
9use tokio_util::sync::CancellationToken;
10
11use log::debug;
12
13use crate::chunker::{Chunker, InMemoryCache};
14use crate::file_watcher::async_watcher;
15use crate::indexer::check_index_once;
16use crate::syncer::{check_download_once, check_upload_once};
17
18const CHANNEL_SIZE: usize = 100;
19const INMEMORY_CACHE_MAX_REC: usize = 100000;
20const INMEMORY_CACHE_MAX_MEM: u64 = 100_000_000_000;
21const DUMMY_SECRET: &[u8] = b"dummy_secret";
22
23pub mod chunker;
24pub mod connection;
25pub mod context;
26pub mod errors;
27pub mod file_watcher;
28pub mod indexer;
29pub mod models;
30pub mod registry;
31pub mod remote;
32pub mod schema;
33pub mod syncer;
34
35pub use context::{SyncContext, SyncStatusListener};
37pub use models::SyncStatus;
38
39pub fn extract_uid_from_jwt(token: &str) -> i32 {
40 let mut validation = Validation::new(Algorithm::HS256);
41
42 validation.insecure_disable_signature_validation();
44
45 #[derive(Debug, Clone, Serialize, Deserialize)]
46 struct Claims {
47 uid: i32,
48 }
49
50 let token_data: TokenData<Claims> =
51 decode::<Claims>(token, &DecodingKey::from_secret(DUMMY_SECRET), &validation)
52 .expect("Failed to decode token");
53
54 token_data.claims.uid
55}
56
57uniffi::setup_scaffolding!();
58
59#[uniffi::export]
62pub fn run(
63 context: Arc<SyncContext>,
64 storage_dir: &str,
65 db_file_path: &str,
66 api_endpoint: &str,
67 remote_token: &str,
68 namespace_id: i32,
69 download_only: bool,
70) -> Result<(), errors::SyncError> {
71 let token = context.token();
72 let listener = context.listener();
73
74 Runtime::new()?.block_on(run_async(
75 token,
76 listener,
77 storage_dir,
78 db_file_path,
79 api_endpoint,
80 remote_token,
81 namespace_id,
82 download_only,
83 ))?;
84
85 Ok(())
86}
87
88#[uniffi::export]
93pub fn wait_remote_update(api_endpoint: &str, remote_token: &str) -> Result<(), errors::SyncError> {
94 Runtime::new()?.block_on(remote::Remote::new(api_endpoint, remote_token).poll())?;
95
96 Ok(())
97}
98
99#[uniffi::export]
103pub fn run_download_once(
104 storage_dir: &str,
105 db_file_path: &str,
106 api_endpoint: &str,
107 remote_token: &str,
108 namespace_id: i32,
109) -> Result<(), errors::SyncError> {
110 use std::env;
111
112 env::set_var("CARGO_LOG", "trace");
113
114 let storage_dir = &PathBuf::from(storage_dir);
115 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
116 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
117 let chunker = Arc::new(Mutex::new(chunker));
118 let remote = &remote::Remote::new(api_endpoint, remote_token);
119
120 let pool = connection::get_connection_pool(db_file_path)?;
121 debug!("Started connection pool for {:?}", db_file_path);
122
123 Runtime::new()?.block_on(check_download_once(
124 &pool,
125 Arc::clone(&chunker),
126 remote,
127 storage_dir,
128 namespace_id,
129 ))?;
130
131 Ok(())
132}
133
134#[uniffi::export]
138pub fn run_upload_once(
139 storage_dir: &str,
140 db_file_path: &str,
141 api_endpoint: &str,
142 remote_token: &str,
143 namespace_id: i32,
144) -> Result<(), errors::SyncError> {
145 let storage_dir = &PathBuf::from(storage_dir);
146 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
147 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
148 let chunker = Arc::new(Mutex::new(chunker));
149 let remote = &remote::Remote::new(api_endpoint, remote_token);
150
151 let pool = connection::get_connection_pool(db_file_path)?;
152 debug!("Started connection pool for {:?}", db_file_path);
153
154 check_index_once(&pool, storage_dir, namespace_id)?;
155
156 let runtime = Runtime::new()?;
157
158 if !runtime.block_on(check_upload_once(
161 &pool,
162 Arc::clone(&chunker),
163 remote,
164 namespace_id,
165 ))? {
166 runtime.block_on(check_upload_once(
167 &pool,
168 Arc::clone(&chunker),
169 remote,
170 namespace_id,
171 ))?;
172 }
173
174 Ok(())
175}
176
177#[allow(clippy::too_many_arguments)]
179pub async fn run_async(
180 token: CancellationToken,
181 listener: Option<Arc<dyn SyncStatusListener>>,
182 storage_dir: &str,
183 db_file_path: &str,
184 api_endpoint: &str,
185 remote_token: &str,
186 namespace_id: i32,
187 download_only: bool,
188) -> Result<(), errors::SyncError> {
189 let (mut debouncer, local_file_update_rx) = async_watcher()?;
191 let (local_registry_updated_tx, local_registry_updated_rx) = channel(CHANNEL_SIZE);
192
193 let storage_dir = &PathBuf::from(storage_dir);
194 let chunk_cache = InMemoryCache::new(INMEMORY_CACHE_MAX_REC, INMEMORY_CACHE_MAX_MEM);
195 let chunker = &mut Chunker::new(chunk_cache, storage_dir.clone());
196 let remote = &remote::Remote::new(api_endpoint, remote_token);
197
198 let pool = connection::get_connection_pool(db_file_path)?;
199 debug!("Started connection pool for {:?}", db_file_path);
200
201 if !download_only {
202 debouncer
203 .watcher()
204 .watch(storage_dir, RecursiveMode::Recursive)?;
205 debug!("Started watcher on {:?}", storage_dir);
206 }
207
208 if let Some(ref cb) = listener {
210 cb.on_status_changed(SyncStatus::Syncing);
211 }
212
213 let indexer = indexer::run(
214 token.clone(),
215 listener.clone(),
216 &pool,
217 storage_dir,
218 namespace_id,
219 local_file_update_rx,
220 local_registry_updated_tx,
221 );
222 debug!("Started indexer on {:?}", storage_dir);
223
224 let syncer = syncer::run(
225 token.clone(),
226 listener.clone(),
227 &pool,
228 storage_dir,
229 namespace_id,
230 chunker,
231 remote,
232 local_registry_updated_rx,
233 download_only,
234 );
235 debug!("Started syncer");
236
237 let result = try_join!(indexer, syncer);
238
239 if let Some(ref cb) = listener {
241 match result {
242 Ok(_) => cb.on_complete(true, None),
243 Err(ref e) => cb.on_complete(false, Some(format!("{:?}", e))),
244 }
245 }
246
247 result?;
248 Ok(())
249}