Skip to main content

oxigdal_streaming/cloud/
object_store.rs

1//! Cloud object storage abstractions: URL parsing, byte-range requests,
2//! credentials, presigned URLs, multipart upload, and range coalescing.
3
4use std::collections::HashMap;
5use thiserror::Error;
6
7// ─────────────────────────────────────────────────────────────────────────────
8// CloudError
9// ─────────────────────────────────────────────────────────────────────────────
10
11/// Errors produced by the cloud I/O layer.
12#[derive(Debug, Error, Clone, PartialEq, Eq)]
13pub enum CloudError {
14    /// The URL string could not be parsed.
15    #[error("invalid URL: {0}")]
16    InvalidUrl(String),
17
18    /// The URL scheme is not supported.
19    #[error("unsupported scheme: {0}")]
20    UnsupportedScheme(String),
21
22    /// Credentials are required but were not provided.
23    #[error("missing credentials")]
24    MissingCredentials,
25
26    /// The supplied credentials are malformed or expired.
27    #[error("invalid credentials: {0}")]
28    InvalidCredentials(String),
29
30    /// An error occurred while generating a presigned URL.
31    #[error("presign error: {0}")]
32    PresignError(String),
33
34    /// The requested byte range exceeds the object size.
35    #[error("range out of bounds: [{start}, {end}) vs size {size}")]
36    RangeOutOfBounds {
37        /// Start offset of the requested range.
38        start: u64,
39        /// End offset (exclusive) of the requested range.
40        end: u64,
41        /// Actual size of the object.
42        size: u64,
43    },
44}
45
46// ─────────────────────────────────────────────────────────────────────────────
47// CloudScheme / ObjectUrl
48// ─────────────────────────────────────────────────────────────────────────────
49
50/// Supported cloud URL schemes.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum CloudScheme {
53    /// Amazon S3 (`s3://`)
54    S3,
55    /// Google Cloud Storage (`gs://`)
56    Gs,
57    /// Azure Blob Storage (`az://` or `abfs://`)
58    Az,
59    /// Plain HTTP (`http://`)
60    Http,
61    /// HTTPS (`https://`)
62    Https,
63}
64
65/// A parsed cloud object URL.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct ObjectUrl {
68    /// Scheme of the URL.
69    pub scheme: CloudScheme,
70    /// Bucket or container name.
71    pub bucket: String,
72    /// Object key / path within the bucket.
73    pub key: String,
74    /// AWS / GCS region (if present in the URL).
75    pub region: Option<String>,
76    /// Custom endpoint override.
77    pub endpoint: Option<String>,
78}
79
80impl ObjectUrl {
81    /// Parse a cloud URL string into an [`ObjectUrl`].
82    ///
83    /// Supported forms:
84    /// - `s3://bucket/key`
85    /// - `gs://bucket/key`
86    /// - `az://container/blob`
87    /// - `abfs://container@account.dfs.core.windows.net/path`
88    /// - `http://host/path`
89    /// - `https://host/path`
90    pub fn parse(url: &str) -> Result<Self, CloudError> {
91        let (scheme_str, rest) = url
92            .split_once("://")
93            .ok_or_else(|| CloudError::InvalidUrl(format!("no scheme separator in '{url}'")))?;
94
95        let scheme = match scheme_str.to_ascii_lowercase().as_str() {
96            "s3" => CloudScheme::S3,
97            "gs" => CloudScheme::Gs,
98            "az" | "abfs" => CloudScheme::Az,
99            "http" => CloudScheme::Http,
100            "https" => CloudScheme::Https,
101            other => return Err(CloudError::UnsupportedScheme(other.to_owned())),
102        };
103
104        match &scheme {
105            CloudScheme::Http | CloudScheme::Https => {
106                // For http(s) the "bucket" is the host and the "key" is the path.
107                let (host, path) = if let Some(idx) = rest.find('/') {
108                    (&rest[..idx], &rest[idx + 1..])
109                } else {
110                    (rest, "")
111                };
112                if host.is_empty() {
113                    return Err(CloudError::InvalidUrl(format!("no host in '{url}'")));
114                }
115                Ok(ObjectUrl {
116                    scheme,
117                    bucket: host.to_owned(),
118                    key: path.to_owned(),
119                    region: None,
120                    endpoint: None,
121                })
122            }
123            _ => {
124                // s3://, gs://, az://  →  bucket/key
125                let (bucket, key) = if let Some(idx) = rest.find('/') {
126                    (&rest[..idx], &rest[idx + 1..])
127                } else {
128                    (rest, "")
129                };
130                if bucket.is_empty() {
131                    return Err(CloudError::InvalidUrl(format!("no bucket in '{url}'")));
132                }
133                Ok(ObjectUrl {
134                    scheme,
135                    bucket: bucket.to_owned(),
136                    key: key.to_owned(),
137                    region: None,
138                    endpoint: None,
139                })
140            }
141        }
142    }
143
144    /// Convert the cloud URL to an HTTPS URL.
145    ///
146    /// - S3: `https://{bucket}.s3.{region}.amazonaws.com/{key}`
147    /// - GCS: `https://storage.googleapis.com/{bucket}/{key}`
148    /// - Azure: `https://{account}.blob.core.windows.net/{container}/{key}`
149    /// - HTTP/HTTPS: return as-is (upgraded to https for http)
150    pub fn to_https_url(&self, endpoint_override: Option<&str>) -> String {
151        if let Some(ep) = endpoint_override {
152            let base = ep.trim_end_matches('/');
153            return format!("{base}/{}/{}", self.bucket, self.key);
154        }
155        match &self.scheme {
156            CloudScheme::S3 => {
157                let region = self.region.as_deref().unwrap_or("us-east-1");
158                format!(
159                    "https://{}.s3.{}.amazonaws.com/{}",
160                    self.bucket, region, self.key
161                )
162            }
163            CloudScheme::Gs => {
164                format!(
165                    "https://storage.googleapis.com/{}/{}",
166                    self.bucket, self.key
167                )
168            }
169            CloudScheme::Az => {
170                // bucket is treated as "account/container"
171                // We store just "container" in bucket; account may be in endpoint.
172                format!("https://{}.blob.core.windows.net/{}", self.bucket, self.key)
173            }
174            CloudScheme::Http => {
175                format!("https://{}/{}", self.bucket, self.key)
176            }
177            CloudScheme::Https => {
178                format!("https://{}/{}", self.bucket, self.key)
179            }
180        }
181    }
182
183    /// Build the canonical host string used when signing requests.
184    pub fn signing_host(&self) -> String {
185        match &self.scheme {
186            CloudScheme::S3 => {
187                let region = self.region.as_deref().unwrap_or("us-east-1");
188                format!("{}.s3.{}.amazonaws.com", self.bucket, region)
189            }
190            CloudScheme::Gs => "storage.googleapis.com".to_owned(),
191            CloudScheme::Az => {
192                format!("{}.blob.core.windows.net", self.bucket)
193            }
194            CloudScheme::Http | CloudScheme::Https => self.bucket.clone(),
195        }
196    }
197
198    /// Return the URL path component used for signing.
199    pub fn signing_path(&self) -> String {
200        let key = if self.key.starts_with('/') {
201            self.key.clone()
202        } else {
203            format!("/{}", self.key)
204        };
205        match &self.scheme {
206            CloudScheme::Gs | CloudScheme::Az => {
207                format!("/{}{}", self.bucket, key)
208            }
209            _ => key,
210        }
211    }
212}
213
214// ─────────────────────────────────────────────────────────────────────────────
215// ByteRangeRequest
216// ─────────────────────────────────────────────────────────────────────────────
217
218/// A request for a specific byte range of a cloud object.
219#[derive(Debug, Clone, PartialEq, Eq)]
220pub struct ByteRangeRequest {
221    /// The object URL to read from.
222    pub url: ObjectUrl,
223    /// Byte range (exclusive end).
224    pub range: std::ops::Range<u64>,
225}
226
227impl ByteRangeRequest {
228    /// Create a new byte-range request.
229    pub fn new(url: ObjectUrl, start: u64, end: u64) -> Self {
230        ByteRangeRequest {
231            url,
232            range: start..end,
233        }
234    }
235
236    /// Return the HTTP `Range` header value: `bytes=start-end_inclusive`.
237    pub fn to_http_range_header(&self) -> String {
238        let end_inclusive = self.range.end.saturating_sub(1);
239        format!("bytes={}-{}", self.range.start, end_inclusive)
240    }
241
242    /// Number of bytes in this range.
243    pub fn length(&self) -> u64 {
244        self.range.end.saturating_sub(self.range.start)
245    }
246}
247
248// ─────────────────────────────────────────────────────────────────────────────
249// ObjectMetadata
250// ─────────────────────────────────────────────────────────────────────────────
251
252/// Metadata for a cloud object.
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct ObjectMetadata {
255    /// The object URL.
256    pub url: ObjectUrl,
257    /// Total size in bytes.
258    pub size: u64,
259    /// MIME content type, if available.
260    pub content_type: Option<String>,
261    /// ETag string, if available.
262    pub etag: Option<String>,
263    /// Last-modified Unix timestamp, if available.
264    pub last_modified: Option<u64>,
265    /// User-defined metadata key/value pairs.
266    pub user_metadata: HashMap<String, String>,
267}
268
269// ─────────────────────────────────────────────────────────────────────────────
270// CloudCredentials
271// ─────────────────────────────────────────────────────────────────────────────
272
273/// Authentication credentials for cloud object storage.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub enum CloudCredentials {
276    /// No authentication (public buckets).
277    Anonymous,
278    /// AWS / GCS access-key credentials.
279    AccessKey {
280        /// Access key ID.
281        access_key_id: String,
282        /// Secret access key.
283        secret_access_key: String,
284        /// Optional STS session token.
285        session_token: Option<String>,
286    },
287    /// GCS service-account JSON file path.
288    ServiceAccountFile {
289        /// Path to the service account JSON file.
290        path: String,
291    },
292    /// Azure Shared Key authentication.
293    AzureSharedKey {
294        /// Azure storage account name.
295        account_name: String,
296        /// Base64-encoded account key.
297        account_key: String,
298    },
299    /// Azure SAS token.
300    SasToken {
301        /// The SAS token string.
302        token: String,
303    },
304    /// Generic OAuth2 bearer token.
305    Bearer {
306        /// The bearer token string.
307        token: String,
308    },
309}
310
311// ─────────────────────────────────────────────────────────────────────────────
312// PresignedUrlConfig / HttpMethod
313// ─────────────────────────────────────────────────────────────────────────────
314
315/// HTTP verb used in a presigned URL.
316#[derive(Debug, Clone, PartialEq, Eq)]
317pub enum HttpMethod {
318    /// GET request.
319    Get,
320    /// PUT request.
321    Put,
322    /// DELETE request.
323    Delete,
324    /// HEAD request.
325    Head,
326}
327
328impl HttpMethod {
329    fn as_str(&self) -> &'static str {
330        match self {
331            HttpMethod::Get => "GET",
332            HttpMethod::Put => "PUT",
333            HttpMethod::Delete => "DELETE",
334            HttpMethod::Head => "HEAD",
335        }
336    }
337}
338
339/// Configuration for presigned URL generation.
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct PresignedUrlConfig {
342    /// How many seconds the URL should remain valid.
343    pub expires_in_secs: u64,
344    /// HTTP method the presigned URL will allow.
345    pub method: HttpMethod,
346    /// Optional content-type constraint.
347    pub content_type: Option<String>,
348}
349
350impl PresignedUrlConfig {
351    /// Create a GET presigned URL configuration.
352    pub fn get(expires_in_secs: u64) -> Self {
353        PresignedUrlConfig {
354            expires_in_secs,
355            method: HttpMethod::Get,
356            content_type: None,
357        }
358    }
359
360    /// Create a PUT presigned URL configuration with a content type.
361    pub fn put(expires_in_secs: u64, content_type: impl Into<String>) -> Self {
362        PresignedUrlConfig {
363            expires_in_secs,
364            method: HttpMethod::Put,
365            content_type: Some(content_type.into()),
366        }
367    }
368}
369
370// ─────────────────────────────────────────────────────────────────────────────
371// Pure-Rust SHA-256 and HMAC-SHA256
372// ─────────────────────────────────────────────────────────────────────────────
373
374/// SHA-256 round constants.
375const K: [u32; 64] = [
376    0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
377    0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
378    0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
379    0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
380    0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
381    0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
382    0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
383    0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2,
384];
385
386/// Initial hash values (first 32 bits of the fractional parts of the square roots of the first 8 primes).
387const H0: [u32; 8] = [
388    0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19,
389];
390
391/// Compute SHA-256 of `data`.  Pure-Rust implementation — no external crates.
392pub fn sha256(data: &[u8]) -> [u8; 32] {
393    let mut h = H0;
394
395    // Pre-processing: add padding
396    let bit_len = (data.len() as u64).wrapping_mul(8);
397    let mut msg: Vec<u8> = data.to_vec();
398    msg.push(0x80);
399    while msg.len() % 64 != 56 {
400        msg.push(0x00);
401    }
402    msg.extend_from_slice(&bit_len.to_be_bytes());
403
404    // Process each 512-bit (64-byte) block
405    for block in msg.chunks_exact(64) {
406        let mut w = [0u32; 64];
407        for (i, chunk) in block.chunks_exact(4).enumerate().take(16) {
408            w[i] = u32::from_be_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
409        }
410        for i in 16..64 {
411            let s0 = w[i - 15].rotate_right(7) ^ w[i - 15].rotate_right(18) ^ (w[i - 15] >> 3);
412            let s1 = w[i - 2].rotate_right(17) ^ w[i - 2].rotate_right(19) ^ (w[i - 2] >> 10);
413            w[i] = w[i - 16]
414                .wrapping_add(s0)
415                .wrapping_add(w[i - 7])
416                .wrapping_add(s1);
417        }
418
419        let [mut a, mut b, mut c, mut d, mut e, mut f, mut g, mut hh] = h;
420
421        for i in 0..64 {
422            let s1 = e.rotate_right(6) ^ e.rotate_right(11) ^ e.rotate_right(25);
423            let ch = (e & f) ^ ((!e) & g);
424            let temp1 = hh
425                .wrapping_add(s1)
426                .wrapping_add(ch)
427                .wrapping_add(K[i])
428                .wrapping_add(w[i]);
429            let s0 = a.rotate_right(2) ^ a.rotate_right(13) ^ a.rotate_right(22);
430            let maj = (a & b) ^ (a & c) ^ (b & c);
431            let temp2 = s0.wrapping_add(maj);
432
433            hh = g;
434            g = f;
435            f = e;
436            e = d.wrapping_add(temp1);
437            d = c;
438            c = b;
439            b = a;
440            a = temp1.wrapping_add(temp2);
441        }
442
443        h[0] = h[0].wrapping_add(a);
444        h[1] = h[1].wrapping_add(b);
445        h[2] = h[2].wrapping_add(c);
446        h[3] = h[3].wrapping_add(d);
447        h[4] = h[4].wrapping_add(e);
448        h[5] = h[5].wrapping_add(f);
449        h[6] = h[6].wrapping_add(g);
450        h[7] = h[7].wrapping_add(hh);
451    }
452
453    let mut out = [0u8; 32];
454    for (i, &word) in h.iter().enumerate() {
455        out[i * 4..(i + 1) * 4].copy_from_slice(&word.to_be_bytes());
456    }
457    out
458}
459
460/// Compute HMAC-SHA256.
461pub fn hmac_sha256(key: &[u8], data: &[u8]) -> [u8; 32] {
462    const BLOCK_SIZE: usize = 64;
463
464    // Derive the actual HMAC key (hash if longer than block size)
465    let mut k = [0u8; BLOCK_SIZE];
466    if key.len() > BLOCK_SIZE {
467        let hashed = sha256(key);
468        k[..32].copy_from_slice(&hashed);
469    } else {
470        k[..key.len()].copy_from_slice(key);
471    }
472
473    let mut ipad = [0u8; BLOCK_SIZE];
474    let mut opad = [0u8; BLOCK_SIZE];
475    for i in 0..BLOCK_SIZE {
476        ipad[i] = k[i] ^ 0x36;
477        opad[i] = k[i] ^ 0x5c;
478    }
479
480    let mut inner = ipad.to_vec();
481    inner.extend_from_slice(data);
482    let inner_hash = sha256(&inner);
483
484    let mut outer = opad.to_vec();
485    outer.extend_from_slice(&inner_hash);
486    sha256(&outer)
487}
488
489/// Compute HMAC-SHA256 and return the result as a lowercase hex string.
490pub fn hmac_sha256_hex(key: &[u8], data: &[u8]) -> String {
491    hex_encode(&hmac_sha256(key, data))
492}
493
494/// Encode a byte slice as a lowercase hexadecimal string.
495pub fn hex_encode(bytes: &[u8]) -> String {
496    bytes
497        .iter()
498        .fold(String::with_capacity(bytes.len() * 2), |mut s, b| {
499            use std::fmt::Write;
500            let _ = write!(s, "{b:02x}");
501            s
502        })
503}
504
505// ─────────────────────────────────────────────────────────────────────────────
506// PresignedUrlGenerator
507// ─────────────────────────────────────────────────────────────────────────────
508
509/// Generates presigned URLs using AWS SigV4 / GCS v4 signing.
510///
511/// The implementation is entirely pure Rust — no external cryptographic crates.
512pub struct PresignedUrlGenerator {
513    /// The credentials used for signing.
514    pub credentials: CloudCredentials,
515    /// AWS / GCS region.
516    pub region: String,
517}
518
519impl PresignedUrlGenerator {
520    /// Create a new generator.
521    pub fn new(credentials: CloudCredentials, region: impl Into<String>) -> Self {
522        PresignedUrlGenerator {
523            credentials,
524            region: region.into(),
525        }
526    }
527
528    /// Format a Unix timestamp as an AWS date string (`YYYYMMDD`).
529    fn format_date(ts: u64) -> String {
530        // Days since Unix epoch
531        let days = ts / 86_400;
532        let (year, month, day) = days_to_ymd(days);
533        format!("{year:04}{month:02}{day:02}")
534    }
535
536    /// Format a Unix timestamp as an AWS datetime string (`YYYYMMDDTHHmmSSZ`).
537    fn format_datetime(ts: u64) -> String {
538        let date = Self::format_date(ts);
539        let rem = ts % 86_400;
540        let h = rem / 3600;
541        let m = (rem % 3600) / 60;
542        let s = rem % 60;
543        format!("{date}T{h:02}{m:02}{s:02}Z")
544    }
545
546    /// Derive the AWS SigV4 signing key.
547    fn derive_signing_key(secret: &str, date: &str, region: &str, service: &str) -> [u8; 32] {
548        let k_date = hmac_sha256(format!("AWS4{secret}").as_bytes(), date.as_bytes());
549        let k_region = hmac_sha256(&k_date, region.as_bytes());
550        let k_service = hmac_sha256(&k_region, service.as_bytes());
551        hmac_sha256(&k_service, b"aws4_request")
552    }
553
554    /// Percent-encode a string using AWS URI encoding rules.
555    fn uri_encode(s: &str, encode_slash: bool) -> String {
556        let mut out = String::with_capacity(s.len());
557        for byte in s.bytes() {
558            match byte {
559                b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
560                    out.push(byte as char);
561                }
562                b'/' if !encode_slash => out.push('/'),
563                other => {
564                    use std::fmt::Write;
565                    let _ = write!(out, "%{other:02X}");
566                }
567            }
568        }
569        out
570    }
571
572    /// Build a sorted canonical query string from key-value pairs.
573    pub fn canonical_query_string(&self, params: &[(String, String)]) -> String {
574        let mut sorted: Vec<(String, String)> = params
575            .iter()
576            .map(|(k, v)| (Self::uri_encode(k, true), Self::uri_encode(v, true)))
577            .collect();
578        sorted.sort_by(|(a, _), (b, _)| a.cmp(b));
579        sorted
580            .iter()
581            .map(|(k, v)| format!("{k}={v}"))
582            .collect::<Vec<_>>()
583            .join("&")
584    }
585
586    /// Generate an AWS SigV4 presigned URL.
587    pub fn generate_s3(
588        &self,
589        url: &ObjectUrl,
590        config: &PresignedUrlConfig,
591        timestamp_unix: u64,
592    ) -> Result<String, CloudError> {
593        let (access_key_id, secret_access_key) = match &self.credentials {
594            CloudCredentials::AccessKey {
595                access_key_id,
596                secret_access_key,
597                ..
598            } => (access_key_id.as_str(), secret_access_key.as_str()),
599            _ => return Err(CloudError::MissingCredentials),
600        };
601
602        let service = "s3";
603        let date = Self::format_date(timestamp_unix);
604        let datetime = Self::format_datetime(timestamp_unix);
605
606        let credential = format!(
607            "{access_key_id}/{date}/{}/{service}/aws4_request",
608            self.region
609        );
610
611        let host = url.signing_host();
612        let path = Self::uri_encode(&url.key, false);
613        let canonical_path = format!("/{path}");
614
615        let mut query_params: Vec<(String, String)> = vec![
616            ("X-Amz-Algorithm".to_owned(), "AWS4-HMAC-SHA256".to_owned()),
617            ("X-Amz-Credential".to_owned(), credential.clone()),
618            ("X-Amz-Date".to_owned(), datetime.clone()),
619            (
620                "X-Amz-Expires".to_owned(),
621                config.expires_in_secs.to_string(),
622            ),
623            ("X-Amz-SignedHeaders".to_owned(), "host".to_owned()),
624        ];
625
626        if let CloudCredentials::AccessKey {
627            session_token: Some(tok),
628            ..
629        } = &self.credentials
630        {
631            query_params.push(("X-Amz-Security-Token".to_owned(), tok.clone()));
632        }
633
634        let canonical_query = self.canonical_query_string(&query_params);
635
636        let canonical_headers = format!("host:{host}\n");
637        let signed_headers = "host";
638
639        // For presigned URLs, the payload hash is "UNSIGNED-PAYLOAD"
640        let payload_hash = "UNSIGNED-PAYLOAD";
641
642        let canonical_request = format!(
643            "{method}\n{path}\n{query}\n{headers}\n{signed}\n{payload}",
644            method = config.method.as_str(),
645            path = canonical_path,
646            query = canonical_query,
647            headers = canonical_headers,
648            signed = signed_headers,
649            payload = payload_hash,
650        );
651
652        let scope = format!("{date}/{}/{service}/aws4_request", self.region);
653        let string_to_sign = format!(
654            "AWS4-HMAC-SHA256\n{datetime}\n{scope}\n{hash}",
655            hash = hex_encode(&sha256(canonical_request.as_bytes())),
656        );
657
658        let signing_key = Self::derive_signing_key(secret_access_key, &date, &self.region, service);
659        let signature = hmac_sha256_hex(&signing_key, string_to_sign.as_bytes());
660
661        let mut final_params = query_params;
662        final_params.push(("X-Amz-Signature".to_owned(), signature));
663        let final_query = self.canonical_query_string(&final_params);
664
665        let base_url = format!("https://{host}{canonical_path}");
666        Ok(format!("{base_url}?{final_query}"))
667    }
668
669    /// Generate a GCS v4 presigned URL (same signing algorithm as S3 SigV4).
670    pub fn generate_gcs(
671        &self,
672        url: &ObjectUrl,
673        config: &PresignedUrlConfig,
674        timestamp_unix: u64,
675    ) -> Result<String, CloudError> {
676        let (access_key_id, secret_access_key) = match &self.credentials {
677            CloudCredentials::AccessKey {
678                access_key_id,
679                secret_access_key,
680                ..
681            } => (access_key_id.as_str(), secret_access_key.as_str()),
682            CloudCredentials::ServiceAccountFile { path } => {
683                // In a real implementation we'd parse the JSON; here we use the path
684                // as a stand-in identifier so the signature is deterministic in tests.
685                return Err(CloudError::PresignError(format!(
686                    "service account file signing requires JSON parsing (path: {path})"
687                )));
688            }
689            _ => return Err(CloudError::MissingCredentials),
690        };
691
692        let service = "storage";
693        let date = Self::format_date(timestamp_unix);
694        let datetime = Self::format_datetime(timestamp_unix);
695        let host = "storage.googleapis.com";
696        let canonical_path = format!("/{}/{}", url.bucket, Self::uri_encode(&url.key, false));
697
698        let credential = format!(
699            "{access_key_id}/{date}/{}/{service}/goog4_request",
700            self.region
701        );
702
703        let query_params: Vec<(String, String)> = vec![
704            (
705                "X-Goog-Algorithm".to_owned(),
706                "GOOG4-HMAC-SHA256".to_owned(),
707            ),
708            ("X-Goog-Credential".to_owned(), credential.clone()),
709            ("X-Goog-Date".to_owned(), datetime.clone()),
710            (
711                "X-Goog-Expires".to_owned(),
712                config.expires_in_secs.to_string(),
713            ),
714            ("X-Goog-SignedHeaders".to_owned(), "host".to_owned()),
715        ];
716
717        let canonical_query = self.canonical_query_string(&query_params);
718        let canonical_headers = format!("host:{host}\n");
719        let signed_headers = "host";
720        let payload_hash = "UNSIGNED-PAYLOAD";
721
722        let canonical_request = format!(
723            "{method}\n{path}\n{query}\n{headers}\n{signed}\n{payload}",
724            method = config.method.as_str(),
725            path = canonical_path,
726            query = canonical_query,
727            headers = canonical_headers,
728            signed = signed_headers,
729            payload = payload_hash,
730        );
731
732        let scope = format!("{date}/{}/{service}/goog4_request", self.region);
733        let string_to_sign = format!(
734            "GOOG4-HMAC-SHA256\n{datetime}\n{scope}\n{hash}",
735            hash = hex_encode(&sha256(canonical_request.as_bytes())),
736        );
737
738        let signing_key = Self::derive_signing_key(secret_access_key, &date, &self.region, service);
739        let signature = hmac_sha256_hex(&signing_key, string_to_sign.as_bytes());
740
741        let mut final_params = query_params;
742        final_params.push(("X-Goog-Signature".to_owned(), signature));
743        let final_query = self.canonical_query_string(&final_params);
744
745        Ok(format!("https://{host}{canonical_path}?{final_query}"))
746    }
747}
748
749// ─────────────────────────────────────────────────────────────────────────────
750// Date arithmetic helper
751// ─────────────────────────────────────────────────────────────────────────────
752
753/// Convert a count of days since the Unix epoch (1970-01-01) to (year, month, day).
754fn days_to_ymd(days: u64) -> (u32, u32, u32) {
755    // Use the civil calendar algorithm (proleptic Gregorian).
756    // Reference: http://howardhinnant.github.io/date_algorithms.html
757    let z = days as i64 + 719_468;
758    let era: i64 = if z >= 0 { z } else { z - 146_096 } / 146_097;
759    let doe = (z - era * 146_097) as u64;
760    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
761    let y = yoe as i64 + era * 400;
762    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
763    let mp = (5 * doy + 2) / 153;
764    let d = doy - (153 * mp + 2) / 5 + 1;
765    let m = if mp < 10 { mp + 3 } else { mp - 9 };
766    let y_adj = if m <= 2 { y + 1 } else { y };
767    (y_adj as u32, m as u32, d as u32)
768}
769
770// ─────────────────────────────────────────────────────────────────────────────
771// MultipartUploadState
772// ─────────────────────────────────────────────────────────────────────────────
773
774/// Tracks the state of an S3 multipart upload.
775#[derive(Debug, Clone, PartialEq, Eq)]
776pub struct CompletedPart {
777    /// 1-based part number.
778    pub part_number: u16,
779    /// ETag returned by the server for this part.
780    pub etag: String,
781    /// Size of this part in bytes.
782    pub size: u64,
783}
784
785/// State machine for an in-progress multipart upload.
786#[derive(Debug, Clone, PartialEq, Eq)]
787pub struct MultipartUploadState {
788    /// The upload ID assigned by the cloud provider.
789    pub upload_id: String,
790    /// The target object URL.
791    pub url: ObjectUrl,
792    /// Parts that have been successfully uploaded.
793    pub parts: Vec<CompletedPart>,
794    /// Nominal part size in bytes.
795    pub part_size: u64,
796}
797
798impl MultipartUploadState {
799    /// Create a new multipart upload tracker.
800    pub fn new(upload_id: impl Into<String>, url: ObjectUrl, part_size: u64) -> Self {
801        MultipartUploadState {
802            upload_id: upload_id.into(),
803            url,
804            parts: Vec::new(),
805            part_size,
806        }
807    }
808
809    /// Record a completed part.
810    pub fn add_part(&mut self, part_number: u16, etag: impl Into<String>, size: u64) {
811        self.parts.push(CompletedPart {
812            part_number,
813            etag: etag.into(),
814            size,
815        });
816    }
817
818    /// Total uploaded bytes across all parts.
819    pub fn total_size(&self) -> u64 {
820        self.parts.iter().map(|p| p.size).sum()
821    }
822
823    /// Number of parts recorded.
824    pub fn part_count(&self) -> usize {
825        self.parts.len()
826    }
827
828    /// Build the S3 `CompleteMultipartUpload` XML body.
829    ///
830    /// Parts are emitted in ascending part-number order.
831    pub fn to_xml(&self) -> String {
832        let mut sorted = self.parts.clone();
833        sorted.sort_by_key(|p| p.part_number);
834
835        let mut xml = String::from("<CompleteMultipartUpload>\n");
836        for part in &sorted {
837            xml.push_str("  <Part>\n");
838            xml.push_str(&format!(
839                "    <PartNumber>{}</PartNumber>\n",
840                part.part_number
841            ));
842            xml.push_str(&format!("    <ETag>{}</ETag>\n", part.etag));
843            xml.push_str("  </Part>\n");
844        }
845        xml.push_str("</CompleteMultipartUpload>");
846        xml
847    }
848}
849
850// ─────────────────────────────────────────────────────────────────────────────
851// CloudRangeCoalescer
852// ─────────────────────────────────────────────────────────────────────────────
853
854/// Merges nearby byte-range requests into larger ones to reduce round-trip
855/// overhead when reading from cloud object storage.
856pub struct CloudRangeCoalescer {
857    /// Maximum gap between two ranges that should still be merged.
858    pub max_gap_bytes: u64,
859    /// Maximum total size of a coalesced request.
860    pub max_request_size: u64,
861    /// Minimum size of a request (avoid sending tiny reads).
862    pub min_request_size: u64,
863}
864
865impl Default for CloudRangeCoalescer {
866    fn default() -> Self {
867        Self::new()
868    }
869}
870
871impl CloudRangeCoalescer {
872    /// Create a coalescer with sensible defaults for cloud storage:
873    /// - `max_gap_bytes` = 512 KiB
874    /// - `max_request_size` = 8 MiB
875    /// - `min_request_size` = 64 KiB
876    pub fn new() -> Self {
877        CloudRangeCoalescer {
878            max_gap_bytes: 512 * 1024,
879            max_request_size: 8 * 1024 * 1024,
880            min_request_size: 64 * 1024,
881        }
882    }
883
884    /// Coalesce a list of byte-range requests.
885    ///
886    /// All input requests must target the **same** URL.  Requests are sorted by
887    /// start offset and then merged when:
888    /// - the gap between consecutive ranges is ≤ `max_gap_bytes`, **and**
889    /// - the resulting coalesced range would not exceed `max_request_size`.
890    pub fn coalesce(&self, mut ranges: Vec<ByteRangeRequest>) -> Vec<ByteRangeRequest> {
891        if ranges.is_empty() {
892            return ranges;
893        }
894
895        // Sort by start offset
896        ranges.sort_by_key(|r| r.range.start);
897
898        let url = ranges[0].url.clone();
899        let mut coalesced: Vec<ByteRangeRequest> = Vec::new();
900        let mut current_start = ranges[0].range.start;
901        let mut current_end = ranges[0].range.end;
902
903        for req in ranges.into_iter().skip(1) {
904            let gap = req.range.start.saturating_sub(current_end);
905            let new_end = req.range.end.max(current_end);
906            let new_size = new_end - current_start;
907
908            if gap <= self.max_gap_bytes && new_size <= self.max_request_size {
909                // Merge
910                current_end = new_end;
911            } else {
912                coalesced.push(ByteRangeRequest::new(
913                    url.clone(),
914                    current_start,
915                    current_end,
916                ));
917                current_start = req.range.start;
918                current_end = req.range.end;
919            }
920        }
921        coalesced.push(ByteRangeRequest::new(url, current_start, current_end));
922        coalesced
923    }
924
925    /// Extract a sub-range from a coalesced response buffer.
926    ///
927    /// `coalesced_start` is the byte offset at which `coalesced_data` begins.
928    /// `sub_range` is the desired slice within the full object.
929    ///
930    /// # Panics
931    ///
932    /// Panics if `sub_range` does not fall within the coalesced data window.
933    pub fn slice_response<'a>(
934        coalesced_data: &'a [u8],
935        coalesced_start: u64,
936        sub_range: &std::ops::Range<u64>,
937    ) -> &'a [u8] {
938        let offset = (sub_range.start - coalesced_start) as usize;
939        let len = (sub_range.end - sub_range.start) as usize;
940        &coalesced_data[offset..offset + len]
941    }
942}