1use crate::{Error, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use tracing::{debug, warn};
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "lowercase")]
19pub enum DataSourceType {
20 Local,
22 Git,
24 Http,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct DataSourceConfig {
31 #[serde(rename = "type")]
33 pub source_type: DataSourceType,
34 pub location: String,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub branch: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub auth_token: Option<String>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub path: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub cache_dir: Option<PathBuf>,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub refresh_interval: Option<u64>,
51}
52
53#[derive(Debug, Clone)]
55pub struct DataSourceContent {
56 pub content: Vec<u8>,
58 pub content_type: Option<String>,
60 pub metadata: HashMap<String, String>,
62}
63
64#[async_trait::async_trait]
66pub trait DataSource: Send + Sync {
67 async fn load(&self) -> Result<DataSourceContent>;
69
70 async fn check_updated(&self) -> Result<bool>;
72
73 fn source_type(&self) -> DataSourceType;
75}
76
77pub struct LocalDataSource {
79 path: PathBuf,
80}
81
82impl LocalDataSource {
83 pub fn new(path: impl AsRef<Path>) -> Self {
85 Self {
86 path: path.as_ref().to_path_buf(),
87 }
88 }
89}
90
91#[async_trait::async_trait]
92impl DataSource for LocalDataSource {
93 async fn load(&self) -> Result<DataSourceContent> {
94 debug!("Loading data from local file: {}", self.path.display());
95
96 let content = tokio::fs::read(&self.path).await.map_err(|e| {
97 Error::io_with_context(
98 format!("reading local file {}", self.path.display()),
99 e.to_string(),
100 )
101 })?;
102
103 let content_type =
104 self.path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
105 "json" => "application/json".to_string(),
106 "yaml" | "yml" => "application/x-yaml".to_string(),
107 "xml" => "application/xml".to_string(),
108 "csv" => "text/csv".to_string(),
109 _ => format!("text/{}", ext),
110 });
111
112 let mut metadata = HashMap::new();
113 if let Ok(metadata_info) = tokio::fs::metadata(&self.path).await {
114 metadata.insert("size".to_string(), metadata_info.len().to_string());
115 if let Ok(modified) = metadata_info.modified() {
116 metadata.insert("modified".to_string(), format!("{:?}", modified));
117 }
118 }
119 metadata.insert("path".to_string(), self.path.display().to_string());
120
121 Ok(DataSourceContent {
122 content,
123 content_type,
124 metadata,
125 })
126 }
127
128 async fn check_updated(&self) -> Result<bool> {
129 Ok(true)
133 }
134
135 fn source_type(&self) -> DataSourceType {
136 DataSourceType::Local
137 }
138}
139
140pub struct GitDataSource {
142 config: DataSourceConfig,
143 repo_path: PathBuf,
144}
145
146impl GitDataSource {
147 pub fn new(config: DataSourceConfig) -> Result<Self> {
149 let repo_name = Self::extract_repo_name(&config.location)?;
151 let cache_dir = config
152 .cache_dir
153 .clone()
154 .unwrap_or_else(|| PathBuf::from("./.mockforge-data-cache"));
155 let repo_path = cache_dir.join(repo_name);
156
157 Ok(Self { config, repo_path })
158 }
159
160 fn extract_repo_name(url: &str) -> Result<String> {
162 let name = if let Some(stripped) = url.strip_suffix(".git") {
163 stripped
164 } else {
165 url
166 };
167
168 let parts: Vec<&str> = name.split('/').collect();
169 if let Some(last) = parts.last() {
170 let clean = last.split('?').next().unwrap_or(last);
171 Ok(clean.to_string())
172 } else {
173 Err(Error::config(format!("Invalid Git repository URL: {}", url)))
174 }
175 }
176
177 async fn ensure_repo(&self) -> Result<()> {
179 use std::process::Command;
180
181 if let Some(parent) = self.repo_path.parent() {
183 tokio::fs::create_dir_all(parent)
184 .await
185 .map_err(|e| Error::io_with_context("creating cache directory", e.to_string()))?;
186 }
187
188 if self.repo_path.exists() {
189 debug!("Updating Git repository: {}", self.repo_path.display());
191 let branch = self.config.branch.as_deref().unwrap_or("main");
192 let repo_path_str = self.repo_path.to_str().unwrap();
193
194 let output = Command::new("git")
196 .args(["-C", repo_path_str, "fetch", "origin", branch])
197 .output()
198 .map_err(|e| Error::io_with_context("git fetch", e.to_string()))?;
199
200 if !output.status.success() {
201 warn!("Git fetch failed, continuing anyway");
202 }
203
204 let output = Command::new("git")
206 .args([
207 "-C",
208 repo_path_str,
209 "reset",
210 "--hard",
211 &format!("origin/{}", branch),
212 ])
213 .output()
214 .map_err(|e| Error::io_with_context("git reset", e.to_string()))?;
215
216 if !output.status.success() {
217 let stderr = String::from_utf8_lossy(&output.stderr);
218 return Err(Error::io_with_context("git reset", stderr.to_string()));
219 }
220 } else {
221 debug!("Cloning Git repository: {}", self.config.location);
223 let url = if let Some(ref token) = self.config.auth_token {
224 Self::inject_auth_token(&self.config.location, token)?
225 } else {
226 self.config.location.clone()
227 };
228
229 let branch = self.config.branch.as_deref().unwrap_or("main");
230 let repo_path_str = self.repo_path.to_str().unwrap();
231
232 let output = Command::new("git")
233 .args([
234 "clone",
235 "--branch",
236 branch,
237 "--depth",
238 "1",
239 &url,
240 repo_path_str,
241 ])
242 .output()
243 .map_err(|e| Error::io_with_context("git clone", e.to_string()))?;
244
245 if !output.status.success() {
246 let stderr = String::from_utf8_lossy(&output.stderr);
247 return Err(Error::io_with_context("git clone", stderr.to_string()));
248 }
249 }
250
251 Ok(())
252 }
253
254 fn inject_auth_token(url: &str, token: &str) -> Result<String> {
256 if url.starts_with("https://") {
257 if let Some(rest) = url.strip_prefix("https://") {
258 return Ok(format!("https://{}@{}", token, rest));
259 }
260 }
261 if url.contains('@') {
262 warn!("SSH URL detected. Token authentication may not work.");
263 }
264 Ok(url.to_string())
265 }
266}
267
268#[async_trait::async_trait]
269impl DataSource for GitDataSource {
270 async fn load(&self) -> Result<DataSourceContent> {
271 self.ensure_repo().await?;
273
274 let file_path = if let Some(ref path) = self.config.path {
276 self.repo_path.join(path)
277 } else {
278 return Err(Error::config(
279 "Git data source requires a 'path' to specify the file within the repository"
280 .to_string(),
281 ));
282 };
283
284 if !file_path.exists() {
285 return Err(Error::not_found(
286 "file in Git repository".to_string(),
287 file_path.display().to_string(),
288 ));
289 }
290
291 let content = tokio::fs::read(&file_path).await.map_err(|e| {
293 Error::io_with_context("reading file from Git repository", e.to_string())
294 })?;
295
296 let content_type =
297 file_path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
298 "json" => "application/json".to_string(),
299 "yaml" | "yml" => "application/x-yaml".to_string(),
300 _ => format!("text/{}", ext),
301 });
302
303 let mut metadata = HashMap::new();
304 metadata.insert("source".to_string(), "git".to_string());
305 metadata.insert("repository".to_string(), self.config.location.clone());
306 if let Some(ref branch) = self.config.branch {
307 metadata.insert("branch".to_string(), branch.clone());
308 }
309 metadata.insert("path".to_string(), file_path.display().to_string());
310
311 use std::process::Command;
313 if let Ok(output) = Command::new("git")
314 .args(["-C", self.repo_path.to_str().unwrap(), "rev-parse", "HEAD"])
315 .output()
316 {
317 if output.status.success() {
318 if let Ok(commit) = String::from_utf8(output.stdout) {
319 metadata.insert("commit".to_string(), commit.trim().to_string());
320 }
321 }
322 }
323
324 Ok(DataSourceContent {
325 content,
326 content_type,
327 metadata,
328 })
329 }
330
331 async fn check_updated(&self) -> Result<bool> {
332 use std::process::Command;
334
335 let branch = self.config.branch.as_deref().unwrap_or("main");
336 let repo_path_str = self.repo_path.to_str().unwrap();
337
338 let _output = Command::new("git")
340 .args(["-C", repo_path_str, "fetch", "origin", branch])
341 .output();
342
343 let output = Command::new("git")
345 .args([
346 "-C",
347 repo_path_str,
348 "rev-list",
349 "--count",
350 &format!("HEAD..origin/{}", branch),
351 ])
352 .output()
353 .map_err(|e| Error::io_with_context("checking git for updates", e.to_string()))?;
354
355 if output.status.success() {
356 if let Ok(count_str) = String::from_utf8(output.stdout) {
357 if let Ok(count) = count_str.trim().parse::<u32>() {
358 return Ok(count > 0);
359 }
360 }
361 }
362
363 Ok(false)
364 }
365
366 fn source_type(&self) -> DataSourceType {
367 DataSourceType::Git
368 }
369}
370
371pub struct HttpDataSource {
373 url: String,
374 auth_token: Option<String>,
375 refresh_interval: Option<u64>,
376 last_fetch: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
377 cached_content: std::sync::Arc<std::sync::Mutex<Option<DataSourceContent>>>,
378}
379
380impl HttpDataSource {
381 pub fn new(config: DataSourceConfig) -> Self {
383 Self {
384 url: config.location.clone(),
385 auth_token: config.auth_token.clone(),
386 refresh_interval: config.refresh_interval,
387 last_fetch: std::sync::Arc::new(std::sync::Mutex::new(None)),
388 cached_content: std::sync::Arc::new(std::sync::Mutex::new(None)),
389 }
390 }
391
392 async fn fetch(&self) -> Result<DataSourceContent> {
394 let client = reqwest::Client::new();
395
396 let mut request = client.get(&self.url);
398 if let Some(ref token) = self.auth_token {
399 request = request.bearer_auth(token);
400 }
401
402 let response = request.send().await.map_err(|e| {
403 Error::internal(format!("Failed to fetch data from {}: {}", self.url, e))
404 })?;
405
406 let status = response.status();
408 let status_code = status.as_u16();
409
410 if !status.is_success() {
411 return Err(Error::internal(format!("HTTP request failed with status {}", status)));
412 }
413
414 let content_type = response
415 .headers()
416 .get("content-type")
417 .and_then(|h| h.to_str().ok())
418 .map(|s| s.to_string());
419
420 let content = response
421 .bytes()
422 .await
423 .map_err(|e| Error::internal(format!("Failed to read response body: {}", e)))?
424 .to_vec();
425
426 let mut metadata = HashMap::new();
427 metadata.insert("source".to_string(), "http".to_string());
428 metadata.insert("url".to_string(), self.url.clone());
429 metadata.insert("status".to_string(), status_code.to_string());
430 if let Some(content_type) = &content_type {
431 metadata.insert("content_type".to_string(), content_type.clone());
432 }
433
434 Ok(DataSourceContent {
435 content,
436 content_type,
437 metadata,
438 })
439 }
440}
441
442#[async_trait::async_trait]
443impl DataSource for HttpDataSource {
444 async fn load(&self) -> Result<DataSourceContent> {
445 {
447 let cached_guard =
448 self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
449 let last_fetch_guard =
450 self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
451
452 if let (Some(cached), Some(last_fetch), Some(refresh_interval)) =
453 (cached_guard.as_ref(), last_fetch_guard.as_ref(), self.refresh_interval)
454 {
455 if last_fetch.elapsed().as_secs() < refresh_interval {
456 debug!("Using cached HTTP data");
457 return Ok(cached.clone());
458 }
459 }
460 }
461
462 let content = self.fetch().await?;
464 {
465 let mut last_fetch =
466 self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
467 let mut cached =
468 self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
469 *last_fetch = Some(std::time::Instant::now());
470 *cached = Some(content.clone());
471 }
472
473 Ok(content)
474 }
475
476 async fn check_updated(&self) -> Result<bool> {
477 let last_fetch = self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
479 if let (Some(last_fetch), Some(refresh_interval)) =
480 (last_fetch.as_ref(), self.refresh_interval)
481 {
482 Ok(last_fetch.elapsed().as_secs() >= refresh_interval)
483 } else {
484 Ok(true)
486 }
487 }
488
489 fn source_type(&self) -> DataSourceType {
490 DataSourceType::Http
491 }
492}
493
494pub struct DataSourceFactory;
496
497impl DataSourceFactory {
498 pub fn create(config: DataSourceConfig) -> Result<Box<dyn DataSource + Send + Sync>> {
500 match config.source_type {
501 DataSourceType::Local => Ok(Box::new(LocalDataSource::new(&config.location))),
502 DataSourceType::Git => {
503 let git_source = GitDataSource::new(config)?;
504 Ok(Box::new(git_source))
505 }
506 DataSourceType::Http => Ok(Box::new(HttpDataSource::new(config))),
507 }
508 }
509}
510
511pub struct DataSourceManager {
513 sources: HashMap<String, std::sync::Arc<dyn DataSource + Send + Sync>>,
514}
515
516impl DataSourceManager {
517 pub fn new() -> Self {
519 Self {
520 sources: HashMap::new(),
521 }
522 }
523
524 pub fn register(&mut self, name: String, source: Box<dyn DataSource + Send + Sync>) {
526 self.sources.insert(name, std::sync::Arc::from(source));
527 }
528
529 pub async fn load(&self, name: &str) -> Result<DataSourceContent> {
531 let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
532
533 source.load().await
534 }
535
536 pub async fn check_updated(&self, name: &str) -> Result<bool> {
538 let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
539
540 source.check_updated().await
541 }
542
543 pub fn list_sources(&self) -> Vec<String> {
545 self.sources.keys().cloned().collect()
546 }
547}
548
549impl Default for DataSourceManager {
550 fn default() -> Self {
551 Self::new()
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_local_data_source_creation() {
561 let source = LocalDataSource::new("./test.json");
562 assert_eq!(source.source_type(), DataSourceType::Local);
563 }
564
565 #[test]
566 fn test_git_data_source_config() {
567 let config = DataSourceConfig {
568 source_type: DataSourceType::Git,
569 location: "https://github.com/user/repo.git".to_string(),
570 branch: Some("main".to_string()),
571 auth_token: None,
572 path: Some("data/test.json".to_string()),
573 cache_dir: None,
574 refresh_interval: None,
575 };
576
577 let source = GitDataSource::new(config).unwrap();
578 assert_eq!(source.source_type(), DataSourceType::Git);
579 }
580
581 #[test]
582 fn test_http_data_source_config() {
583 let config = DataSourceConfig {
584 source_type: DataSourceType::Http,
585 location: "https://api.example.com/data.json".to_string(),
586 branch: None,
587 auth_token: Some("token123".to_string()),
588 path: None,
589 cache_dir: None,
590 refresh_interval: Some(60),
591 };
592
593 let source = HttpDataSource::new(config);
594 assert_eq!(source.source_type(), DataSourceType::Http);
595 }
596
597 #[test]
598 fn test_data_source_factory() {
599 let local_config = DataSourceConfig {
600 source_type: DataSourceType::Local,
601 location: "./test.json".to_string(),
602 branch: None,
603 auth_token: None,
604 path: None,
605 cache_dir: None,
606 refresh_interval: None,
607 };
608
609 let source = DataSourceFactory::create(local_config).unwrap();
610 assert_eq!(source.source_type(), DataSourceType::Local);
611 }
612}