Skip to main content

typub_storage/
upload.rs

1//! Asset upload orchestration.
2//!
3//! Per [[RFC-0004:C-PIPELINE-INTEGRATION]].
4
5use anyhow::{Context, Result};
6use std::collections::HashMap;
7use std::path::Path;
8use typub_config::StorageConfig;
9use typub_core::AssetStrategy;
10use typub_log::{debug, info};
11
12use crate::deferred::{DeferredAssets, PendingAssetList};
13use crate::s3::S3Storage;
14use crate::status::{AssetUploadRecord, StatusTracker};
15use crate::url_mapping::key_candidates;
16
17/// Upload all pending assets with caching support.
18///
19/// Per [[RFC-0004:C-UPLOAD-TRACKING]].
20///
21/// # Arguments
22///
23/// * `storage` - S3-compatible storage client
24/// * `pending` - List of pending assets to upload
25/// * `status` - Status tracker for caching upload records
26///
27/// # Returns
28///
29/// A HashMap mapping asset index to remote URL.
30pub async fn upload_pending_assets(
31    storage: &S3Storage,
32    pending: &PendingAssetList,
33    status: &StatusTracker,
34) -> Result<HashMap<usize, String>> {
35    let mut url_map = HashMap::new();
36
37    for asset in &pending.assets {
38        let url = upload_asset_with_cache(storage, &asset.local_path, status).await?;
39        url_map.insert(asset.index, url);
40    }
41
42    Ok(url_map)
43}
44
45/// Upload a single asset with caching support.
46///
47/// Checks the status tracker for an existing upload record (by content hash).
48/// If found and the storage config matches, returns the cached URL.
49/// Otherwise, uploads the asset and records it.
50async fn upload_asset_with_cache(
51    storage: &S3Storage,
52    local_path: &Path,
53    status: &StatusTracker,
54) -> Result<String> {
55    // Read file data
56    let data = std::fs::read(local_path)
57        .with_context(|| format!("Failed to read asset: {}", local_path.display()))?;
58
59    // Compute content hash and extension
60    let content_hash = S3Storage::compute_hash(&data);
61    let extension = S3Storage::normalize_extension(local_path);
62
63    // Check content index for cache hit
64    if let Some(record) =
65        status.get_asset_by_content(storage.config_id(), &content_hash, &extension)?
66    {
67        debug!(
68            asset = %local_path.display(),
69            hash = &content_hash[..8],
70            "Cache hit for asset"
71        );
72        return Ok(record.remote_url);
73    }
74
75    // Upload to S3
76    debug!(asset = %local_path.display(), "Uploading asset to S3");
77    let result = storage.upload(local_path, &data).await?;
78
79    // Record upload with normalized relative path per [[RFC-0005:C-PROJECT-ROOT]]
80    let normalized_path = status.normalize_path(local_path)?;
81    let record = AssetUploadRecord {
82        local_path: normalized_path,
83        content_hash: result.content_hash.clone(),
84        extension: result.extension.clone(),
85        storage_config_id: storage.config_id().to_string(),
86        remote_key: result.remote_key.clone(),
87        remote_url: result.remote_url.clone(),
88        uploaded_at: chrono::Utc::now().to_rfc3339(),
89    };
90    status.record_asset_upload(&record)?;
91
92    Ok(result.remote_url)
93}
94
95/// Materialize assets for External strategy (without caching).
96///
97/// This version uploads assets without StatusTracker caching support,
98/// suitable for adapter subcrates that don't have access to StatusTracker.
99///
100/// Per [[RFC-0004:C-PIPELINE-INTEGRATION]] v0.2.0 and [[RFC-0007:C-ADAPTER-CRATE]].
101///
102/// # Arguments
103///
104/// * `assets` - Mutable reference to DeferredAssets (will be populated with resolved URLs)
105/// * `storage_config` - S3-compatible storage configuration
106///
107/// # Returns
108///
109/// `Ok(())` on success, with `assets.resolved` populated.
110pub async fn materialize_external_assets(
111    assets: &mut DeferredAssets,
112    storage_config: &StorageConfig,
113) -> Result<()> {
114    if !assets.needs_materialize() {
115        return Ok(());
116    }
117
118    if assets.strategy != AssetStrategy::External {
119        return Ok(()); // Only handle External strategy
120    }
121
122    let storage = S3Storage::new(storage_config)?;
123
124    info!(
125        count = assets.pending.assets.len(),
126        "[1/2] Uploading assets to external storage"
127    );
128
129    let url_map = upload_pending_assets_uncached(&storage, &assets.pending).await?;
130    assets.resolved = url_map;
131
132    info!("[2/2] Assets uploaded and URLs resolved");
133
134    Ok(())
135}
136
137/// Materialize assets for External strategy (with caching).
138///
139/// This version uses StatusTracker for caching upload records, avoiding
140/// re-uploads of assets that were previously uploaded with the same content hash.
141///
142/// Per [[RFC-0004:C-PIPELINE-INTEGRATION]] v0.2.0.
143///
144/// # Arguments
145///
146/// * `assets` - Mutable reference to DeferredAssets (will be populated with resolved URLs)
147/// * `storage_config` - S3-compatible storage configuration
148/// * `status` - Status tracker for caching upload records
149///
150/// # Returns
151///
152/// `Ok(())` on success, with `assets.resolved` populated.
153pub async fn materialize_external_assets_with_status(
154    assets: &mut DeferredAssets,
155    storage_config: &StorageConfig,
156    status: &StatusTracker,
157) -> Result<()> {
158    if !assets.needs_materialize() {
159        return Ok(());
160    }
161
162    if assets.strategy != AssetStrategy::External {
163        return Ok(()); // Only handle External strategy
164    }
165
166    let storage = S3Storage::new(storage_config)?;
167
168    info!(
169        count = assets.pending.assets.len(),
170        "[1/2] Uploading assets to external storage"
171    );
172
173    let url_map = upload_pending_assets(&storage, &assets.pending, status).await?;
174    assets.resolved = url_map;
175
176    info!("[2/2] Assets uploaded and URLs resolved");
177
178    Ok(())
179}
180
181/// Upload all pending assets without caching.
182///
183/// This is a simpler version of `upload_pending_assets` that doesn't use
184/// StatusTracker for cache lookups or recording. Suitable for adapter subcrates.
185async fn upload_pending_assets_uncached(
186    storage: &S3Storage,
187    pending: &PendingAssetList,
188) -> Result<HashMap<usize, String>> {
189    let mut url_map = HashMap::new();
190
191    for asset in &pending.assets {
192        let url = upload_asset_uncached(storage, &asset.local_path).await?;
193        url_map.insert(asset.index, url);
194    }
195
196    Ok(url_map)
197}
198
199/// Upload a single asset without caching.
200async fn upload_asset_uncached(storage: &S3Storage, local_path: &Path) -> Result<String> {
201    // Read file data
202    let data = std::fs::read(local_path)
203        .with_context(|| format!("Failed to read asset: {}", local_path.display()))?;
204
205    // Upload to S3
206    debug!(asset = %local_path.display(), "Uploading asset to S3");
207    let result = storage.upload(local_path, &data).await?;
208
209    Ok(result.remote_url)
210}
211
212/// Build a map from asset reference paths to resolved remote URLs.
213///
214/// This converts the index-based `resolved` map in `DeferredAssets` into a
215/// path-based map suitable for `resolve_asset_urls()`.
216///
217/// Per [[RFC-0004:C-PIPELINE-INTEGRATION]], this bridges the gap between
218/// the upload system (which uses indices) and the IR modification system
219/// (which uses path strings to match asset source paths).
220pub fn build_resolved_url_map(
221    assets: &DeferredAssets,
222    content_path: &Path,
223) -> HashMap<String, String> {
224    let mut url_map = HashMap::new();
225    for asset in &assets.pending.assets {
226        if let Some(url) = assets.resolved.get(&asset.index) {
227            // Add the original_ref as-is
228            url_map.insert(asset.original_ref.clone(), url.clone());
229
230            // Also add candidate key variants so resolve_image_reference_url can match
231            for candidate in key_candidates(&asset.original_ref) {
232                url_map.entry(candidate).or_insert_with(|| url.clone());
233            }
234
235            // Add local_path-based variants (relative to content_path)
236            if let Ok(rel) = asset.local_path.strip_prefix(content_path) {
237                let rel_str = rel.to_string_lossy().replace('\\', "/");
238                url_map.entry(rel_str).or_insert_with(|| url.clone());
239            }
240
241            // Add the full local_path as a key (for temp files with absolute paths)
242            let local_path_str = asset.local_path.to_string_lossy().replace('\\', "/");
243            url_map.entry(local_path_str).or_insert_with(|| url.clone());
244        }
245    }
246    url_map
247}
248
249#[cfg(test)]
250mod tests {
251    #![allow(clippy::expect_used)]
252    use super::*;
253    use crate::deferred::{PendingAsset, PendingAssetList};
254    use std::path::PathBuf;
255
256    #[test]
257    fn test_build_resolved_url_map_empty() {
258        let assets = DeferredAssets::empty();
259        let content_path = PathBuf::from("/project/content/my-post");
260        let map = build_resolved_url_map(&assets, &content_path);
261        assert!(map.is_empty());
262    }
263
264    #[test]
265    fn test_build_resolved_url_map_with_resolved() {
266        let pending = PendingAssetList {
267            assets: vec![PendingAsset {
268                index: 0,
269                local_path: PathBuf::from("/project/content/my-post/image.png"),
270                original_ref: "image.png".to_string(),
271            }],
272        };
273        let mut assets = DeferredAssets::new(pending, AssetStrategy::External);
274        assets
275            .resolved
276            .insert(0, "https://cdn.example.com/abc123.png".to_string());
277
278        let content_path = PathBuf::from("/project/content/my-post");
279        let map = build_resolved_url_map(&assets, &content_path);
280
281        assert_eq!(
282            map.get("image.png"),
283            Some(&"https://cdn.example.com/abc123.png".to_string())
284        );
285    }
286}
287
288// ============================================================================
289// Asset Analysis (shared by dry-run and real publish)
290// ============================================================================
291
292/// Information about a single asset in analysis.
293#[derive(Debug, Clone)]
294pub struct AssetInfo {
295    /// Asset index in the pending list.
296    pub index: usize,
297    /// Path to the asset file.
298    pub path: std::path::PathBuf,
299    /// Size of the asset in bytes.
300    pub size_bytes: u64,
301    /// SHA256 content hash (full 64-char hex string).
302    pub content_hash: String,
303    /// Whether this is a new asset (not cached).
304    pub is_new: bool,
305    /// Cached remote URL (only set if is_new is false).
306    pub cached_url: Option<String>,
307}
308
309/// Result of asset analysis.
310///
311/// This provides detailed information about which assets will be uploaded
312/// and which will use cached URLs. Used by both dry-run and real publish.
313#[derive(Debug, Clone)]
314pub struct AssetAnalysis {
315    /// Total number of assets.
316    pub total_count: usize,
317    /// Number of new assets (will be uploaded).
318    pub new_count: usize,
319    /// Number of cached assets (will use existing URLs).
320    pub cached_count: usize,
321    /// Total size of new assets in bytes.
322    pub new_size_bytes: u64,
323    /// Total size of cached assets in bytes.
324    pub cached_size_bytes: u64,
325    /// Detailed information for each asset.
326    pub assets: Vec<AssetInfo>,
327}
328
329impl AssetAnalysis {
330    /// Create an empty analysis result.
331    pub fn empty() -> Self {
332        Self {
333            total_count: 0,
334            new_count: 0,
335            cached_count: 0,
336            new_size_bytes: 0,
337            cached_size_bytes: 0,
338            assets: Vec::new(),
339        }
340    }
341
342    /// Get human-readable size string.
343    pub fn format_size(bytes: u64) -> String {
344        const KB: u64 = 1024;
345        const MB: u64 = KB * 1024;
346        const GB: u64 = MB * 1024;
347
348        if bytes >= GB {
349            format!("{:.1} GB", bytes as f64 / GB as f64)
350        } else if bytes >= MB {
351            format!("{:.1} MB", bytes as f64 / MB as f64)
352        } else if bytes >= KB {
353            format!("{:.1} KB", bytes as f64 / KB as f64)
354        } else {
355            format!("{} B", bytes)
356        }
357    }
358}
359
360/// Analyze assets without materializing.
361///
362/// This function:
363/// 1. Computes SHA256 hash for each pending asset
364/// 2. Checks StatusTracker for cached URLs
365/// 3. Returns detailed analysis of new vs cached assets
366///
367/// If StatusTracker is not available, all assets are treated as new.
368///
369/// # Arguments
370///
371/// * `assets` - The deferred assets to analyze
372/// * `storage_config_id` - The storage config ID for cache lookup
373/// * `tracker` - Optional status tracker for cache lookup
374pub fn analyze_assets(
375    assets: &DeferredAssets,
376    storage_config_id: &str,
377    tracker: Option<&StatusTracker>,
378) -> Result<AssetAnalysis> {
379    use sha2::{Digest, Sha256};
380
381    if !assets.needs_materialize() {
382        return Ok(AssetAnalysis::empty());
383    }
384
385    let pending = &assets.pending;
386
387    let mut analysis = AssetAnalysis {
388        total_count: pending.assets.len(),
389        new_count: 0,
390        cached_count: 0,
391        new_size_bytes: 0,
392        cached_size_bytes: 0,
393        assets: Vec::with_capacity(pending.assets.len()),
394    };
395
396    for asset in &pending.assets {
397        // Read file and compute hash
398        let data = std::fs::read(&asset.local_path)
399            .with_context(|| format!("Failed to read asset: {}", asset.local_path.display()))?;
400
401        let size_bytes = data.len() as u64;
402        let mut hasher = Sha256::new();
403        hasher.update(&data);
404        let content_hash = hex::encode(hasher.finalize());
405
406        let extension = std::path::Path::new(&asset.local_path)
407            .extension()
408            .and_then(|e| e.to_str())
409            .map(|e| e.to_lowercase())
410            .unwrap_or_default();
411
412        // Check cache if tracker available
413        let (is_new, cached_url) = if let Some(t) = tracker {
414            match t.get_asset_by_content(storage_config_id, &content_hash, &extension) {
415                Ok(Some(record)) => (false, Some(record.remote_url)),
416                _ => (true, None),
417            }
418        } else {
419            (true, None)
420        };
421
422        let info = AssetInfo {
423            index: asset.index,
424            path: asset.local_path.clone(),
425            size_bytes,
426            content_hash,
427            is_new,
428            cached_url,
429        };
430
431        if info.is_new {
432            analysis.new_count += 1;
433            analysis.new_size_bytes += size_bytes;
434        } else {
435            analysis.cached_count += 1;
436            analysis.cached_size_bytes += size_bytes;
437        }
438
439        analysis.assets.push(info);
440    }
441
442    Ok(analysis)
443}
444
445/// Materialize assets with optional mock mode.
446///
447/// If `mock: true`: generates mock URLs without file I/O (for dry-run)
448/// If `mock: false`: actually uploads to S3 (for real publish)
449///
450/// The `mock_url_prefix` is used to generate mock URLs when `mock: true`.
451/// If not provided, defaults to `https://mock-cdn.example.com`.
452/// For External strategy, this can use the storage's url_prefix.
453/// For Upload strategy (adapter-specific), adapters pass their own prefix.
454///
455/// Returns AssetAnalysis for UI logging.
456pub async fn materialize_with_analysis(
457    assets: &mut DeferredAssets,
458    storage_config: &StorageConfig,
459    tracker: Option<&StatusTracker>,
460    mock: bool,
461    mock_url_prefix: Option<&str>,
462) -> Result<AssetAnalysis> {
463    // 1. Always analyze first (compute hashes, check cache)
464    let config_id = storage_config.config_id();
465    let analysis = analyze_assets(assets, &config_id, tracker)?;
466
467    if analysis.total_count == 0 {
468        return Ok(analysis);
469    }
470
471    // 2. Either mock or real upload
472    if mock {
473        // Generate mock URLs (no file I/O)
474        let prefix = mock_url_prefix.unwrap_or("https://mock-cdn.example.com");
475        for info in &analysis.assets {
476            let extension = info
477                .path
478                .extension()
479                .and_then(|e| e.to_str())
480                .unwrap_or("bin");
481            let stem = info
482                .path
483                .file_stem()
484                .and_then(|s| s.to_str())
485                .unwrap_or("asset");
486
487            let mock_url = format!(
488                "{}/{}/{}.{}",
489                prefix,
490                &info.content_hash[..8],
491                stem,
492                extension
493            );
494            assets.resolved.insert(info.index, mock_url);
495        }
496        info!(
497            "Dry-run: {} asset(s) analyzed ({} new, {} cached)",
498            analysis.total_count, analysis.new_count, analysis.cached_count
499        );
500    } else {
501        // Real S3 upload
502        let storage = S3Storage::new(storage_config)?;
503
504        if analysis.new_count > 0 {
505            info!(
506                "Uploading {} new asset(s) to external storage...",
507                analysis.new_count
508            );
509        }
510
511        let url_map = if let Some(t) = tracker {
512            upload_pending_assets(&storage, &assets.pending, t).await?
513        } else {
514            // Without tracker, we can't use caching - upload each asset
515            let mut url_map = std::collections::HashMap::new();
516            for asset in &assets.pending.assets {
517                let data = std::fs::read(&asset.local_path).with_context(|| {
518                    format!("Failed to read asset: {}", asset.local_path.display())
519                })?;
520                let result = storage.upload(&asset.local_path, &data).await?;
521                url_map.insert(asset.index, result.remote_url);
522            }
523            url_map
524        };
525        assets.resolved = url_map;
526
527        info!(
528            "Assets uploaded: {} new, {} cached",
529            analysis.new_count, analysis.cached_count
530        );
531    }
532
533    Ok(analysis)
534}