1use anyhow::{Context, Result};
6use std::collections::HashMap;
7use std::path::Path;
8use typub_config::StorageConfig;
9use typub_core::AssetStrategy;
10use typub_log::{debug, info};
11
12use crate::deferred::{DeferredAssets, PendingAssetList};
13use crate::s3::S3Storage;
14use crate::status::{AssetUploadRecord, StatusTracker};
15use crate::url_mapping::key_candidates;
16
17pub async fn upload_pending_assets(
31 storage: &S3Storage,
32 pending: &PendingAssetList,
33 status: &StatusTracker,
34) -> Result<HashMap<usize, String>> {
35 let mut url_map = HashMap::new();
36
37 for asset in &pending.assets {
38 let url = upload_asset_with_cache(storage, &asset.local_path, status).await?;
39 url_map.insert(asset.index, url);
40 }
41
42 Ok(url_map)
43}
44
45async fn upload_asset_with_cache(
51 storage: &S3Storage,
52 local_path: &Path,
53 status: &StatusTracker,
54) -> Result<String> {
55 let data = std::fs::read(local_path)
57 .with_context(|| format!("Failed to read asset: {}", local_path.display()))?;
58
59 let content_hash = S3Storage::compute_hash(&data);
61 let extension = S3Storage::normalize_extension(local_path);
62
63 if let Some(record) =
65 status.get_asset_by_content(storage.config_id(), &content_hash, &extension)?
66 {
67 debug!(
68 asset = %local_path.display(),
69 hash = &content_hash[..8],
70 "Cache hit for asset"
71 );
72 return Ok(record.remote_url);
73 }
74
75 debug!(asset = %local_path.display(), "Uploading asset to S3");
77 let result = storage.upload(local_path, &data).await?;
78
79 let normalized_path = status.normalize_path(local_path)?;
81 let record = AssetUploadRecord {
82 local_path: normalized_path,
83 content_hash: result.content_hash.clone(),
84 extension: result.extension.clone(),
85 storage_config_id: storage.config_id().to_string(),
86 remote_key: result.remote_key.clone(),
87 remote_url: result.remote_url.clone(),
88 uploaded_at: chrono::Utc::now().to_rfc3339(),
89 };
90 status.record_asset_upload(&record)?;
91
92 Ok(result.remote_url)
93}
94
95pub async fn materialize_external_assets(
111 assets: &mut DeferredAssets,
112 storage_config: &StorageConfig,
113) -> Result<()> {
114 if !assets.needs_materialize() {
115 return Ok(());
116 }
117
118 if assets.strategy != AssetStrategy::External {
119 return Ok(()); }
121
122 let storage = S3Storage::new(storage_config)?;
123
124 info!(
125 count = assets.pending.assets.len(),
126 "[1/2] Uploading assets to external storage"
127 );
128
129 let url_map = upload_pending_assets_uncached(&storage, &assets.pending).await?;
130 assets.resolved = url_map;
131
132 info!("[2/2] Assets uploaded and URLs resolved");
133
134 Ok(())
135}
136
137pub async fn materialize_external_assets_with_status(
154 assets: &mut DeferredAssets,
155 storage_config: &StorageConfig,
156 status: &StatusTracker,
157) -> Result<()> {
158 if !assets.needs_materialize() {
159 return Ok(());
160 }
161
162 if assets.strategy != AssetStrategy::External {
163 return Ok(()); }
165
166 let storage = S3Storage::new(storage_config)?;
167
168 info!(
169 count = assets.pending.assets.len(),
170 "[1/2] Uploading assets to external storage"
171 );
172
173 let url_map = upload_pending_assets(&storage, &assets.pending, status).await?;
174 assets.resolved = url_map;
175
176 info!("[2/2] Assets uploaded and URLs resolved");
177
178 Ok(())
179}
180
181async fn upload_pending_assets_uncached(
186 storage: &S3Storage,
187 pending: &PendingAssetList,
188) -> Result<HashMap<usize, String>> {
189 let mut url_map = HashMap::new();
190
191 for asset in &pending.assets {
192 let url = upload_asset_uncached(storage, &asset.local_path).await?;
193 url_map.insert(asset.index, url);
194 }
195
196 Ok(url_map)
197}
198
199async fn upload_asset_uncached(storage: &S3Storage, local_path: &Path) -> Result<String> {
201 let data = std::fs::read(local_path)
203 .with_context(|| format!("Failed to read asset: {}", local_path.display()))?;
204
205 debug!(asset = %local_path.display(), "Uploading asset to S3");
207 let result = storage.upload(local_path, &data).await?;
208
209 Ok(result.remote_url)
210}
211
212pub fn build_resolved_url_map(
221 assets: &DeferredAssets,
222 content_path: &Path,
223) -> HashMap<String, String> {
224 let mut url_map = HashMap::new();
225 for asset in &assets.pending.assets {
226 if let Some(url) = assets.resolved.get(&asset.index) {
227 url_map.insert(asset.original_ref.clone(), url.clone());
229
230 for candidate in key_candidates(&asset.original_ref) {
232 url_map.entry(candidate).or_insert_with(|| url.clone());
233 }
234
235 if let Ok(rel) = asset.local_path.strip_prefix(content_path) {
237 let rel_str = rel.to_string_lossy().replace('\\', "/");
238 url_map.entry(rel_str).or_insert_with(|| url.clone());
239 }
240
241 let local_path_str = asset.local_path.to_string_lossy().replace('\\', "/");
243 url_map.entry(local_path_str).or_insert_with(|| url.clone());
244 }
245 }
246 url_map
247}
248
249#[cfg(test)]
250mod tests {
251 #![allow(clippy::expect_used)]
252 use super::*;
253 use crate::deferred::{PendingAsset, PendingAssetList};
254 use std::path::PathBuf;
255
256 #[test]
257 fn test_build_resolved_url_map_empty() {
258 let assets = DeferredAssets::empty();
259 let content_path = PathBuf::from("/project/content/my-post");
260 let map = build_resolved_url_map(&assets, &content_path);
261 assert!(map.is_empty());
262 }
263
264 #[test]
265 fn test_build_resolved_url_map_with_resolved() {
266 let pending = PendingAssetList {
267 assets: vec![PendingAsset {
268 index: 0,
269 local_path: PathBuf::from("/project/content/my-post/image.png"),
270 original_ref: "image.png".to_string(),
271 }],
272 };
273 let mut assets = DeferredAssets::new(pending, AssetStrategy::External);
274 assets
275 .resolved
276 .insert(0, "https://cdn.example.com/abc123.png".to_string());
277
278 let content_path = PathBuf::from("/project/content/my-post");
279 let map = build_resolved_url_map(&assets, &content_path);
280
281 assert_eq!(
282 map.get("image.png"),
283 Some(&"https://cdn.example.com/abc123.png".to_string())
284 );
285 }
286}
287
288#[derive(Debug, Clone)]
294pub struct AssetInfo {
295 pub index: usize,
297 pub path: std::path::PathBuf,
299 pub size_bytes: u64,
301 pub content_hash: String,
303 pub is_new: bool,
305 pub cached_url: Option<String>,
307}
308
309#[derive(Debug, Clone)]
314pub struct AssetAnalysis {
315 pub total_count: usize,
317 pub new_count: usize,
319 pub cached_count: usize,
321 pub new_size_bytes: u64,
323 pub cached_size_bytes: u64,
325 pub assets: Vec<AssetInfo>,
327}
328
329impl AssetAnalysis {
330 pub fn empty() -> Self {
332 Self {
333 total_count: 0,
334 new_count: 0,
335 cached_count: 0,
336 new_size_bytes: 0,
337 cached_size_bytes: 0,
338 assets: Vec::new(),
339 }
340 }
341
342 pub fn format_size(bytes: u64) -> String {
344 const KB: u64 = 1024;
345 const MB: u64 = KB * 1024;
346 const GB: u64 = MB * 1024;
347
348 if bytes >= GB {
349 format!("{:.1} GB", bytes as f64 / GB as f64)
350 } else if bytes >= MB {
351 format!("{:.1} MB", bytes as f64 / MB as f64)
352 } else if bytes >= KB {
353 format!("{:.1} KB", bytes as f64 / KB as f64)
354 } else {
355 format!("{} B", bytes)
356 }
357 }
358}
359
360pub fn analyze_assets(
375 assets: &DeferredAssets,
376 storage_config_id: &str,
377 tracker: Option<&StatusTracker>,
378) -> Result<AssetAnalysis> {
379 use sha2::{Digest, Sha256};
380
381 if !assets.needs_materialize() {
382 return Ok(AssetAnalysis::empty());
383 }
384
385 let pending = &assets.pending;
386
387 let mut analysis = AssetAnalysis {
388 total_count: pending.assets.len(),
389 new_count: 0,
390 cached_count: 0,
391 new_size_bytes: 0,
392 cached_size_bytes: 0,
393 assets: Vec::with_capacity(pending.assets.len()),
394 };
395
396 for asset in &pending.assets {
397 let data = std::fs::read(&asset.local_path)
399 .with_context(|| format!("Failed to read asset: {}", asset.local_path.display()))?;
400
401 let size_bytes = data.len() as u64;
402 let mut hasher = Sha256::new();
403 hasher.update(&data);
404 let content_hash = hex::encode(hasher.finalize());
405
406 let extension = std::path::Path::new(&asset.local_path)
407 .extension()
408 .and_then(|e| e.to_str())
409 .map(|e| e.to_lowercase())
410 .unwrap_or_default();
411
412 let (is_new, cached_url) = if let Some(t) = tracker {
414 match t.get_asset_by_content(storage_config_id, &content_hash, &extension) {
415 Ok(Some(record)) => (false, Some(record.remote_url)),
416 _ => (true, None),
417 }
418 } else {
419 (true, None)
420 };
421
422 let info = AssetInfo {
423 index: asset.index,
424 path: asset.local_path.clone(),
425 size_bytes,
426 content_hash,
427 is_new,
428 cached_url,
429 };
430
431 if info.is_new {
432 analysis.new_count += 1;
433 analysis.new_size_bytes += size_bytes;
434 } else {
435 analysis.cached_count += 1;
436 analysis.cached_size_bytes += size_bytes;
437 }
438
439 analysis.assets.push(info);
440 }
441
442 Ok(analysis)
443}
444
445pub async fn materialize_with_analysis(
457 assets: &mut DeferredAssets,
458 storage_config: &StorageConfig,
459 tracker: Option<&StatusTracker>,
460 mock: bool,
461 mock_url_prefix: Option<&str>,
462) -> Result<AssetAnalysis> {
463 let config_id = storage_config.config_id();
465 let analysis = analyze_assets(assets, &config_id, tracker)?;
466
467 if analysis.total_count == 0 {
468 return Ok(analysis);
469 }
470
471 if mock {
473 let prefix = mock_url_prefix.unwrap_or("https://mock-cdn.example.com");
475 for info in &analysis.assets {
476 let extension = info
477 .path
478 .extension()
479 .and_then(|e| e.to_str())
480 .unwrap_or("bin");
481 let stem = info
482 .path
483 .file_stem()
484 .and_then(|s| s.to_str())
485 .unwrap_or("asset");
486
487 let mock_url = format!(
488 "{}/{}/{}.{}",
489 prefix,
490 &info.content_hash[..8],
491 stem,
492 extension
493 );
494 assets.resolved.insert(info.index, mock_url);
495 }
496 info!(
497 "Dry-run: {} asset(s) analyzed ({} new, {} cached)",
498 analysis.total_count, analysis.new_count, analysis.cached_count
499 );
500 } else {
501 let storage = S3Storage::new(storage_config)?;
503
504 if analysis.new_count > 0 {
505 info!(
506 "Uploading {} new asset(s) to external storage...",
507 analysis.new_count
508 );
509 }
510
511 let url_map = if let Some(t) = tracker {
512 upload_pending_assets(&storage, &assets.pending, t).await?
513 } else {
514 let mut url_map = std::collections::HashMap::new();
516 for asset in &assets.pending.assets {
517 let data = std::fs::read(&asset.local_path).with_context(|| {
518 format!("Failed to read asset: {}", asset.local_path.display())
519 })?;
520 let result = storage.upload(&asset.local_path, &data).await?;
521 url_map.insert(asset.index, result.remote_url);
522 }
523 url_map
524 };
525 assets.resolved = url_map;
526
527 info!(
528 "Assets uploaded: {} new, {} cached",
529 analysis.new_count, analysis.cached_count
530 );
531 }
532
533 Ok(analysis)
534}