1#[cfg(feature = "s3")]
2use anyhow::{Context, Result};
3#[cfg(feature = "s3")]
4use async_trait::async_trait;
5#[cfg(feature = "s3")]
6use aws_sdk_s3::{primitives::ByteStream, Client};
7#[cfg(feature = "s3")]
8use bytes::Bytes;
9#[cfg(feature = "s3")]
10use std::sync::Arc;
11
12#[cfg(feature = "s3")]
13use crate::storage_trait::{url_to_fragment, Storage};
14#[cfg(feature = "s3")]
15use argus_common::types::{CrawlJob, FetchResult};
16
17#[cfg(feature = "s3")]
20pub struct S3Storage {
21 client: Arc<Client>,
22 bucket: String,
23 prefix: String,
24}
25
26#[cfg(feature = "s3")]
27impl S3Storage {
28 pub async fn new(bucket: String, prefix: Option<String>) -> Result<Self> {
34 let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
35 let client = Client::new(&config);
36
37 Ok(Self {
38 client: Arc::new(client),
39 bucket,
40 prefix: prefix.unwrap_or_default(),
41 })
42 }
43
44 pub async fn new_with_endpoint(
46 bucket: String,
47 prefix: Option<String>,
48 endpoint_url: String,
49 ) -> Result<Self> {
50 let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
51 let s3_config = aws_sdk_s3::config::Builder::from(&config)
52 .endpoint_url(endpoint_url)
53 .force_path_style(true) .build();
55
56 let client = Client::from_conf(s3_config);
57
58 Ok(Self {
59 client: Arc::new(client),
60 bucket,
61 prefix: prefix.unwrap_or_default(),
62 })
63 }
64
65 fn metadata_key(&self, fragment: &str) -> String {
66 format!("{}page/{}.json", self.prefix, fragment)
67 }
68
69 fn body_key(&self, fragment: &str) -> String {
70 format!("{}body/{}.bin", self.prefix, fragment)
71 }
72
73 pub async fn get_object(&self, key: &str) -> Result<Bytes> {
75 let response = self
76 .client
77 .get_object()
78 .bucket(&self.bucket)
79 .key(key)
80 .send()
81 .await
82 .context("Failed to get object from S3")?;
83
84 let data = response
85 .body
86 .collect()
87 .await
88 .context("Failed to read S3 object body")?;
89
90 Ok(data.into_bytes())
91 }
92
93 pub async fn verify_bucket(&self) -> Result<()> {
95 self.client
96 .head_bucket()
97 .bucket(&self.bucket)
98 .send()
99 .await
100 .context("Failed to access S3 bucket")?;
101
102 Ok(())
103 }
104
105 pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>> {
107 let full_prefix = format!("{}{}", self.prefix, prefix);
108
109 let response = self
110 .client
111 .list_objects_v2()
112 .bucket(&self.bucket)
113 .prefix(&full_prefix)
114 .send()
115 .await
116 .context("Failed to list S3 objects")?;
117
118 let keys = response
119 .contents()
120 .iter()
121 .filter_map(|obj| obj.key().map(|k| k.to_string()))
122 .collect();
123
124 Ok(keys)
125 }
126}
127
128#[cfg(feature = "s3")]
129#[async_trait]
130impl Storage for S3Storage {
131 async fn record_fetch(&self, job: &CrawlJob, result: &FetchResult) -> Result<()> {
132 let fragment = url_to_fragment(&job.normalized_url);
133
134 let body_key = self.body_key(&fragment);
136 self.client
137 .put_object()
138 .bucket(&self.bucket)
139 .key(&body_key)
140 .body(ByteStream::from(result.body.clone()))
141 .content_type(
142 result
143 .content_type
144 .as_deref()
145 .unwrap_or("application/octet-stream"),
146 )
147 .send()
148 .await
149 .context("Failed to store body in S3")?;
150
151 let metadata = serde_json::json!({
153 "url": job.url,
154 "final_url": result.final_url,
155 "status": result.status,
156 "content_type": result.content_type,
157 "depth": job.depth,
158 "body_key": body_key,
159 "fetched_at_ms": std::time::SystemTime::now()
160 .duration_since(std::time::UNIX_EPOCH)
161 .unwrap()
162 .as_millis() as u64,
163 });
164
165 let metadata_json = serde_json::to_vec(&metadata)?;
166 let metadata_key = self.metadata_key(&fragment);
167
168 self.client
169 .put_object()
170 .bucket(&self.bucket)
171 .key(&metadata_key)
172 .body(ByteStream::from(metadata_json))
173 .content_type("application/json")
174 .send()
175 .await
176 .context("Failed to store metadata in S3")?;
177
178 Ok(())
179 }
180}
181
182#[cfg(not(feature = "s3"))]
183pub struct S3Storage;
184
185#[cfg(not(feature = "s3"))]
186impl S3Storage {
187 pub async fn new(_bucket: String, _prefix: Option<String>) -> anyhow::Result<Self> {
188 anyhow::bail!("S3 storage not enabled. Compile with 's3' feature.")
189 }
190
191 pub async fn new_with_endpoint(
192 _bucket: String,
193 _prefix: Option<String>,
194 _endpoint_url: String,
195 ) -> anyhow::Result<Self> {
196 anyhow::bail!("S3 storage not enabled. Compile with 's3' feature.")
197 }
198}
199
200#[cfg(all(test, feature = "s3"))]
201mod tests {
202 use super::*;
203
204 #[tokio::test]
205 #[ignore] async fn s3_storage_basic() {
207 let storage = S3Storage::new("test-bucket".to_string(), Some("test/".to_string()))
208 .await
209 .unwrap();
210
211 storage.verify_bucket().await.unwrap();
212 }
213
214 #[tokio::test]
215 #[ignore] async fn minio_storage() {
217 let storage = S3Storage::new_with_endpoint(
218 "test-bucket".to_string(),
219 Some("test/".to_string()),
220 "http://localhost:9000".to_string(),
221 )
222 .await
223 .unwrap();
224
225 storage.verify_bucket().await.unwrap();
226 }
227}