Skip to main content

argus_storage/
s3.rs

1#[cfg(feature = "s3")]
2use anyhow::{Context, Result};
3#[cfg(feature = "s3")]
4use async_trait::async_trait;
5#[cfg(feature = "s3")]
6use aws_sdk_s3::{primitives::ByteStream, Client};
7#[cfg(feature = "s3")]
8use bytes::Bytes;
9#[cfg(feature = "s3")]
10use std::sync::Arc;
11
12#[cfg(feature = "s3")]
13use crate::storage_trait::{url_to_fragment, Storage};
14#[cfg(feature = "s3")]
15use argus_common::types::{CrawlJob, FetchResult};
16
17/// S3-compatible object storage backend
18/// Works with AWS S3, MinIO, DigitalOcean Spaces, etc.
19#[cfg(feature = "s3")]
20pub struct S3Storage {
21    client: Arc<Client>,
22    bucket: String,
23    prefix: String,
24}
25
26#[cfg(feature = "s3")]
27impl S3Storage {
28    /// Create a new S3 storage backend
29    ///
30    /// # Arguments
31    /// * `bucket` - S3 bucket name
32    /// * `prefix` - Optional prefix for all keys (e.g., "crawl/")
33    pub async fn new(bucket: String, prefix: Option<String>) -> Result<Self> {
34        let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
35        let client = Client::new(&config);
36
37        Ok(Self {
38            client: Arc::new(client),
39            bucket,
40            prefix: prefix.unwrap_or_default(),
41        })
42    }
43
44    /// Create with custom endpoint (for MinIO, etc.)
45    pub async fn new_with_endpoint(
46        bucket: String,
47        prefix: Option<String>,
48        endpoint_url: String,
49    ) -> Result<Self> {
50        let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
51        let s3_config = aws_sdk_s3::config::Builder::from(&config)
52            .endpoint_url(endpoint_url)
53            .force_path_style(true) // Required for MinIO
54            .build();
55
56        let client = Client::from_conf(s3_config);
57
58        Ok(Self {
59            client: Arc::new(client),
60            bucket,
61            prefix: prefix.unwrap_or_default(),
62        })
63    }
64
65    fn metadata_key(&self, fragment: &str) -> String {
66        format!("{}page/{}.json", self.prefix, fragment)
67    }
68
69    fn body_key(&self, fragment: &str) -> String {
70        format!("{}body/{}.bin", self.prefix, fragment)
71    }
72
73    /// Get an object from S3
74    pub async fn get_object(&self, key: &str) -> Result<Bytes> {
75        let response = self
76            .client
77            .get_object()
78            .bucket(&self.bucket)
79            .key(key)
80            .send()
81            .await
82            .context("Failed to get object from S3")?;
83
84        let data = response
85            .body
86            .collect()
87            .await
88            .context("Failed to read S3 object body")?;
89
90        Ok(data.into_bytes())
91    }
92
93    /// Check if bucket exists and is accessible
94    pub async fn verify_bucket(&self) -> Result<()> {
95        self.client
96            .head_bucket()
97            .bucket(&self.bucket)
98            .send()
99            .await
100            .context("Failed to access S3 bucket")?;
101
102        Ok(())
103    }
104
105    /// List objects with a given prefix
106    pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>> {
107        let full_prefix = format!("{}{}", self.prefix, prefix);
108
109        let response = self
110            .client
111            .list_objects_v2()
112            .bucket(&self.bucket)
113            .prefix(&full_prefix)
114            .send()
115            .await
116            .context("Failed to list S3 objects")?;
117
118        let keys = response
119            .contents()
120            .iter()
121            .filter_map(|obj| obj.key().map(|k| k.to_string()))
122            .collect();
123
124        Ok(keys)
125    }
126}
127
128#[cfg(feature = "s3")]
129#[async_trait]
130impl Storage for S3Storage {
131    async fn record_fetch(&self, job: &CrawlJob, result: &FetchResult) -> Result<()> {
132        let fragment = url_to_fragment(&job.normalized_url);
133
134        // Store body
135        let body_key = self.body_key(&fragment);
136        self.client
137            .put_object()
138            .bucket(&self.bucket)
139            .key(&body_key)
140            .body(ByteStream::from(result.body.clone()))
141            .content_type(
142                result
143                    .content_type
144                    .as_deref()
145                    .unwrap_or("application/octet-stream"),
146            )
147            .send()
148            .await
149            .context("Failed to store body in S3")?;
150
151        // Store metadata
152        let metadata = serde_json::json!({
153            "url": job.url,
154            "final_url": result.final_url,
155            "status": result.status,
156            "content_type": result.content_type,
157            "depth": job.depth,
158            "body_key": body_key,
159            "fetched_at_ms": std::time::SystemTime::now()
160                .duration_since(std::time::UNIX_EPOCH)
161                .unwrap()
162                .as_millis() as u64,
163        });
164
165        let metadata_json = serde_json::to_vec(&metadata)?;
166        let metadata_key = self.metadata_key(&fragment);
167
168        self.client
169            .put_object()
170            .bucket(&self.bucket)
171            .key(&metadata_key)
172            .body(ByteStream::from(metadata_json))
173            .content_type("application/json")
174            .send()
175            .await
176            .context("Failed to store metadata in S3")?;
177
178        Ok(())
179    }
180}
181
182#[cfg(not(feature = "s3"))]
183pub struct S3Storage;
184
185#[cfg(not(feature = "s3"))]
186impl S3Storage {
187    pub async fn new(_bucket: String, _prefix: Option<String>) -> anyhow::Result<Self> {
188        anyhow::bail!("S3 storage not enabled. Compile with 's3' feature.")
189    }
190
191    pub async fn new_with_endpoint(
192        _bucket: String,
193        _prefix: Option<String>,
194        _endpoint_url: String,
195    ) -> anyhow::Result<Self> {
196        anyhow::bail!("S3 storage not enabled. Compile with 's3' feature.")
197    }
198}
199
200#[cfg(all(test, feature = "s3"))]
201mod tests {
202    use super::*;
203
204    #[tokio::test]
205    #[ignore] // Requires S3/MinIO setup
206    async fn s3_storage_basic() {
207        let storage = S3Storage::new("test-bucket".to_string(), Some("test/".to_string()))
208            .await
209            .unwrap();
210
211        storage.verify_bucket().await.unwrap();
212    }
213
214    #[tokio::test]
215    #[ignore] // Requires MinIO setup
216    async fn minio_storage() {
217        let storage = S3Storage::new_with_endpoint(
218            "test-bucket".to_string(),
219            Some("test/".to_string()),
220            "http://localhost:9000".to_string(),
221        )
222        .await
223        .unwrap();
224
225        storage.verify_bucket().await.unwrap();
226    }
227}