1use std::collections::{HashMap, HashSet};
7
8use crate::prelude::*;
9use crate::variant::{Variant, VariantClass};
10use cloudillo_types::blob_adapter::CreateBlobOptions;
11use cloudillo_types::hasher;
12use cloudillo_types::meta_adapter::{CreateFile, FileId, FileVariant};
13use cloudillo_types::types::ApiResponse;
14
15#[derive(Debug, Default)]
17pub struct SyncResult {
18 pub file_id: String,
19 pub synced_variants: Vec<String>,
20 pub skipped_variants: Vec<String>,
21 pub failed_variants: Vec<String>,
22}
23
24const VARIANT_ORDER: &[&str] = &["tn", "pf", "sd", "md", "hd", "xd"];
26
27fn should_sync_variant(variant: &str, max_variant: &str) -> bool {
29 let max_idx = VARIANT_ORDER.iter().position(|&v| v == max_variant).unwrap_or(usize::MAX);
30 let var_idx = VARIANT_ORDER.iter().position(|&v| v == variant).unwrap_or(usize::MAX);
31 var_idx <= max_idx
32}
33
34pub fn get_sync_setting_key(class: VariantClass) -> Option<&'static str> {
36 match class {
37 VariantClass::Visual => Some("file.sync_max_vis"),
38 VariantClass::Video => Some("file.sync_max_vid"),
39 VariantClass::Audio => Some("file.sync_max_aud"),
40 VariantClass::Document | VariantClass::Raw => None, }
42}
43
44pub fn get_default_sync_max(class: VariantClass) -> &'static str {
46 match class {
47 VariantClass::Visual => "md",
48 VariantClass::Video => "sd",
49 VariantClass::Audio => "md",
50 VariantClass::Document | VariantClass::Raw => "orig", }
52}
53
54pub fn get_sync_source<'a>(
59 tenant_tag: &str,
60 audience_tag: Option<&'a str>,
61 issuer_tag: &'a str,
62) -> &'a str {
63 match audience_tag {
64 Some(aud) if aud != tenant_tag => aud,
65 _ => issuer_tag,
66 }
67}
68
69pub fn is_audience(tenant_tag: &str, audience_tag: Option<&str>) -> bool {
71 audience_tag.map(|aud| aud == tenant_tag).unwrap_or(false)
72}
73
74fn verify_content_hash(data: &[u8], expected_id: &str) -> ClResult<()> {
79 let prefix = expected_id
81 .find('1')
82 .map(|pos| &expected_id[..pos])
83 .ok_or_else(|| Error::ValidationError(format!("Invalid ID format: {}", expected_id)))?;
84
85 let computed_id = hasher::hash(prefix, data);
87
88 if computed_id.as_ref() != expected_id {
89 return Err(Error::ValidationError(format!(
90 "Hash mismatch: expected {}, got {}",
91 expected_id, computed_id
92 )));
93 }
94
95 Ok(())
96}
97
98pub async fn sync_file_variants(
111 app: &App,
112 tn_id: TnId,
113 remote_id_tag: &str,
114 file_id: &str,
115 variants: Option<&[&str]>,
116 auth: bool,
117) -> ClResult<SyncResult> {
118 let mut result = SyncResult { file_id: file_id.to_string(), ..Default::default() };
119
120 debug!("Syncing file {} from {}", file_id, remote_id_tag);
121
122 let descriptor_path = format!("/files/{}/descriptor", file_id);
124 let descriptor_response: ApiResponse<String> = if auth {
125 app.request.get(tn_id, remote_id_tag, &descriptor_path).await?
126 } else {
127 app.request.get_noauth(tn_id, remote_id_tag, &descriptor_path).await?
128 };
129 let descriptor = &descriptor_response.data;
130
131 debug!(" fetched descriptor: {}", descriptor);
132
133 let mut parsed_variants = super::descriptor::parse_file_descriptor(descriptor)?;
135
136 if let Err(e) = verify_content_hash(descriptor.as_bytes(), file_id) {
138 parsed_variants.sort();
140 let local_descriptor = super::descriptor::get_file_descriptor(&parsed_variants);
141 warn!(
142 "Descriptor hash mismatch for {}:\n fetched: {}\n generated: {}\n error: {}",
143 file_id, descriptor, local_descriptor, e
144 );
145 return Err(e);
146 }
147
148 if parsed_variants.is_empty() {
149 warn!("No variants found in descriptor for {}", file_id);
150 return Ok(result);
151 }
152
153 let variants_to_sync: Vec<_> = if let Some(explicit_variants) = variants {
155 parsed_variants
157 .iter()
158 .filter(|v| explicit_variants.contains(&v.variant))
159 .collect()
160 } else {
161 let mut class_max_variants: HashMap<VariantClass, String> = HashMap::new();
163
164 for variant in &parsed_variants {
166 let class =
167 Variant::parse(variant.variant).map(|v| v.class).unwrap_or(VariantClass::Visual);
168
169 if let std::collections::hash_map::Entry::Vacant(e) = class_max_variants.entry(class) {
170 if let Some(setting_key) = get_sync_setting_key(class) {
171 let max_variant = app
172 .settings
173 .get_string_opt(tn_id, setting_key)
174 .await
175 .ok()
176 .flatten()
177 .unwrap_or_else(|| get_default_sync_max(class).to_string());
178 e.insert(max_variant);
179 }
180 }
182 }
183
184 parsed_variants
186 .iter()
187 .filter(|v| {
188 let class =
189 Variant::parse(v.variant).map(|vp| vp.class).unwrap_or(VariantClass::Visual);
190
191 let Some(max_variant) = class_max_variants.get(&class).map(|s| s.as_str()) else {
193 return true; };
195
196 let quality =
197 Variant::parse(v.variant).map(|vp| vp.quality.as_str()).unwrap_or(v.variant);
198
199 should_sync_variant(quality, max_variant)
200 })
201 .collect()
202 };
203
204 let variants_to_sync_set: HashSet<&str> = variants_to_sync.iter().map(|v| v.variant).collect();
206
207 debug!(
208 "Variants to sync content for: {:?}, total variants: {}",
209 variants_to_sync_set,
210 parsed_variants.len()
211 );
212
213 let existing_f_id = app.meta_adapter.read_f_id_by_file_id(tn_id, file_id).await.ok();
215
216 let existing_variants: Vec<String> = if existing_f_id.is_some() {
218 app.meta_adapter
219 .list_file_variants(tn_id, cloudillo_types::meta_adapter::FileId::FileId(file_id))
220 .await
221 .map(|v| v.iter().map(|fv| fv.variant.to_string()).collect())
222 .unwrap_or_default()
223 } else {
224 vec![]
225 };
226
227 let (f_id, is_new_file): (Option<u64>, bool) = if let Some(f_id) = existing_f_id {
228 debug!("File {} already exists (f_id={}), syncing missing variants", file_id, f_id);
230 (Some(f_id), false)
231 } else {
232 let first_variant = &parsed_variants[0];
236 let create_opts = CreateFile {
237 orig_variant_id: Some(first_variant.variant_id.into()),
238 file_id: Some(file_id.into()), parent_id: None,
240 owner_tag: None, creator_tag: None,
242 preset: Some("sync".into()),
243 content_type: format_to_content_type(first_variant.format).into(),
244 file_name: format!("synced.{}", format_to_extension(first_variant.format)).into(),
245 file_tp: None,
246 created_at: Some(Timestamp::now()),
247 tags: None,
248 x: None,
249 visibility: Some('D'), status: None, };
252
253 match app.meta_adapter.create_file(tn_id, create_opts).await {
254 Ok(FileId::FId(f_id)) => (Some(f_id), true),
255 Ok(FileId::FileId(_)) => {
256 debug!("File {} matched existing by orig_variant_id", file_id);
258 (None, false)
259 }
260 Err(e) => {
261 warn!("Failed to create file entry for {}: {}", file_id, e);
262 return Err(e);
263 }
264 }
265 };
266
267 for variant in &parsed_variants {
270 let variant_id = variant.variant_id;
271 let variant_name = variant.variant;
272 let should_sync_content = variants_to_sync_set.contains(variant_name);
273
274 let variant_record_exists = existing_variants.iter().any(|v| v == variant_name);
276
277 if variant_record_exists {
278 debug!(" variant {} record already exists, skipping", variant_name);
280 result.skipped_variants.push(variant_name.to_string());
281 continue;
282 }
283
284 let (blob_size, available) = if should_sync_content {
286 let blob_exists = app.blob_adapter.stat_blob(tn_id, variant_id).await.is_some();
288
289 if blob_exists {
290 let size = app.blob_adapter.stat_blob(tn_id, variant_id).await.unwrap_or(0);
292 debug!(" variant {} blob already exists", variant_name);
293 result.skipped_variants.push(variant_name.to_string());
294 (size, true)
295 } else {
296 match fetch_and_store_blob(
298 app,
299 tn_id,
300 remote_id_tag,
301 variant_id,
302 variant_name,
303 auth,
304 )
305 .await
306 {
307 Ok(size) => {
308 info!(" synced variant {} ({})", variant_name, variant_id);
309 result.synced_variants.push(variant_name.to_string());
310 (size, true)
311 }
312 Err(e) => {
313 warn!(" failed to sync variant {}: {}", variant_name, e);
314 result.failed_variants.push(variant_name.to_string());
315 continue; }
317 }
318 }
319 } else {
320 debug!(" variant {} metadata-only (not syncing content)", variant_name);
322 result.skipped_variants.push(variant_name.to_string());
323 (variant.size, false) };
325
326 if let Some(f_id) = f_id {
328 let file_variant = FileVariant {
329 variant_id,
330 variant: variant_name,
331 format: variant.format,
332 resolution: variant.resolution,
333 size: blob_size,
334 available,
335 duration: None,
336 bitrate: None,
337 page_count: None,
338 };
339
340 if let Err(e) = app.meta_adapter.create_file_variant(tn_id, f_id, file_variant).await {
341 warn!(" failed to create variant record for {}: {}", variant_name, e);
342 }
343 }
344 }
345
346 if is_new_file {
348 if let Some(f_id) = f_id {
349 if let Err(e) = app.meta_adapter.finalize_file(tn_id, f_id, file_id).await {
350 warn!("Failed to finalize file {}: {}", file_id, e);
351 }
353 }
354 }
355
356 info!(
357 "File sync complete for {}: {} synced, {} skipped, {} failed",
358 file_id,
359 result.synced_variants.len(),
360 result.skipped_variants.len(),
361 result.failed_variants.len()
362 );
363
364 Ok(result)
365}
366
367async fn fetch_and_store_blob(
371 app: &App,
372 tn_id: TnId,
373 remote_id_tag: &str,
374 variant_id: &str,
375 variant_name: &str,
376 auth: bool,
377) -> ClResult<u64> {
378 let variant_path = format!("/files/variant/{}", variant_id);
379 let bytes = app.request.get_bin(tn_id, remote_id_tag, &variant_path, auth).await?;
380
381 if bytes.is_empty() {
382 warn!(" variant {} returned empty data", variant_name);
383 return Err(Error::ValidationError("empty variant data".into()));
384 }
385
386 verify_content_hash(&bytes, variant_id).map_err(|e| {
388 error!(" variant {} hash mismatch: {}", variant_name, e);
389 e
390 })?;
391
392 let blob_size = bytes.len() as u64;
393
394 app.blob_adapter
396 .create_blob_buf(tn_id, variant_id, &bytes, &CreateBlobOptions::default())
397 .await
398 .map_err(|e| {
399 warn!(" failed to store blob for variant {}: {}", variant_name, e);
400 e
401 })?;
402
403 Ok(blob_size)
404}
405
406fn format_to_content_type(format: &str) -> &'static str {
408 match format.to_lowercase().as_str() {
409 "webp" => "image/webp",
410 "avif" => "image/avif",
411 "jpeg" | "jpg" => "image/jpeg",
412 "png" => "image/png",
413 "gif" => "image/gif",
414 _ => "application/octet-stream",
415 }
416}
417
418fn format_to_extension(format: &str) -> &'static str {
420 match format.to_lowercase().as_str() {
421 "webp" => "webp",
422 "avif" => "avif",
423 "jpeg" | "jpg" => "jpg",
424 "png" => "png",
425 "gif" => "gif",
426 _ => "bin",
427 }
428}
429
430