Skip to main content

mockforge_core/
data_source.rs

1//! Data Source Abstraction
2//!
3//! Provides a unified interface for loading test data from various sources:
4//! - Local filesystem
5//! - Git repositories
6//! - HTTP endpoints
7//!
8//! This enables injecting test data into mocks from multiple sources.
9
10use crate::{Error, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use tracing::{debug, warn};
15
16/// Data source type
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "lowercase")]
19pub enum DataSourceType {
20    /// Local filesystem
21    Local,
22    /// Git repository
23    Git,
24    /// HTTP/HTTPS endpoint
25    Http,
26}
27
28/// Configuration for a data source
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct DataSourceConfig {
31    /// Type of data source
32    #[serde(rename = "type")]
33    pub source_type: DataSourceType,
34    /// Source location (path, URL, or Git repo URL)
35    pub location: String,
36    /// Optional branch/tag for Git sources
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub branch: Option<String>,
39    /// Optional authentication token
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub auth_token: Option<String>,
42    /// Optional path within the source (for Git repos or subdirectories)
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub path: Option<String>,
45    /// Optional cache directory for Git sources
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub cache_dir: Option<PathBuf>,
48    /// Optional refresh interval in seconds (for HTTP sources)
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub refresh_interval: Option<u64>,
51}
52
53/// Loaded data from a source
54#[derive(Debug, Clone)]
55pub struct DataSourceContent {
56    /// The content as bytes
57    pub content: Vec<u8>,
58    /// Content type (if known)
59    pub content_type: Option<String>,
60    /// Metadata about the source
61    pub metadata: HashMap<String, String>,
62}
63
64/// Trait for data source implementations
65#[async_trait::async_trait]
66pub trait DataSource: Send + Sync {
67    /// Load data from the source
68    async fn load(&self) -> Result<DataSourceContent>;
69
70    /// Check if the source has been updated (for caching)
71    async fn check_updated(&self) -> Result<bool>;
72
73    /// Get the source type
74    fn source_type(&self) -> DataSourceType;
75}
76
77/// Local filesystem data source
78pub struct LocalDataSource {
79    path: PathBuf,
80}
81
82impl LocalDataSource {
83    /// Create a new local data source
84    pub fn new(path: impl AsRef<Path>) -> Self {
85        Self {
86            path: path.as_ref().to_path_buf(),
87        }
88    }
89}
90
91#[async_trait::async_trait]
92impl DataSource for LocalDataSource {
93    async fn load(&self) -> Result<DataSourceContent> {
94        debug!("Loading data from local file: {}", self.path.display());
95
96        let content = tokio::fs::read(&self.path).await.map_err(|e| {
97            Error::io_with_context(
98                format!("reading local file {}", self.path.display()),
99                e.to_string(),
100            )
101        })?;
102
103        let content_type =
104            self.path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
105                "json" => "application/json".to_string(),
106                "yaml" | "yml" => "application/x-yaml".to_string(),
107                "xml" => "application/xml".to_string(),
108                "csv" => "text/csv".to_string(),
109                _ => format!("text/{}", ext),
110            });
111
112        let mut metadata = HashMap::new();
113        if let Ok(metadata_info) = tokio::fs::metadata(&self.path).await {
114            metadata.insert("size".to_string(), metadata_info.len().to_string());
115            if let Ok(modified) = metadata_info.modified() {
116                metadata.insert("modified".to_string(), format!("{:?}", modified));
117            }
118        }
119        metadata.insert("path".to_string(), self.path.display().to_string());
120
121        Ok(DataSourceContent {
122            content,
123            content_type,
124            metadata,
125        })
126    }
127
128    async fn check_updated(&self) -> Result<bool> {
129        // For local files, we can check modification time
130        // This is a simple implementation - always returns true
131        // A more sophisticated version could track modification times
132        Ok(true)
133    }
134
135    fn source_type(&self) -> DataSourceType {
136        DataSourceType::Local
137    }
138}
139
140/// Git repository data source
141pub struct GitDataSource {
142    config: DataSourceConfig,
143    repo_path: PathBuf,
144}
145
146impl GitDataSource {
147    /// Create a new Git data source
148    pub fn new(config: DataSourceConfig) -> Result<Self> {
149        // Extract repo name from URL
150        let repo_name = Self::extract_repo_name(&config.location)?;
151        let cache_dir = config
152            .cache_dir
153            .clone()
154            .unwrap_or_else(|| PathBuf::from("./.mockforge-data-cache"));
155        let repo_path = cache_dir.join(repo_name);
156
157        Ok(Self { config, repo_path })
158    }
159
160    /// Extract repository name from URL
161    fn extract_repo_name(url: &str) -> Result<String> {
162        let name = if let Some(stripped) = url.strip_suffix(".git") {
163            stripped
164        } else {
165            url
166        };
167
168        let parts: Vec<&str> = name.split('/').collect();
169        if let Some(last) = parts.last() {
170            let clean = last.split('?').next().unwrap_or(last);
171            Ok(clean.to_string())
172        } else {
173            Err(Error::config(format!("Invalid Git repository URL: {}", url)))
174        }
175    }
176
177    /// Initialize or update the repository
178    async fn ensure_repo(&self) -> Result<()> {
179        use std::process::Command;
180
181        // Create cache directory if needed
182        if let Some(parent) = self.repo_path.parent() {
183            tokio::fs::create_dir_all(parent)
184                .await
185                .map_err(|e| Error::io_with_context("creating cache directory", e.to_string()))?;
186        }
187
188        if self.repo_path.exists() {
189            // Update existing repository
190            debug!("Updating Git repository: {}", self.repo_path.display());
191            let branch = self.config.branch.as_deref().unwrap_or("main");
192            let repo_path_str = self.repo_path.to_str().unwrap();
193
194            // Fetch latest changes
195            let output = Command::new("git")
196                .args(["-C", repo_path_str, "fetch", "origin", branch])
197                .output()
198                .map_err(|e| Error::io_with_context("git fetch", e.to_string()))?;
199
200            if !output.status.success() {
201                warn!("Git fetch failed, continuing anyway");
202            }
203
204            // Reset to remote branch
205            let output = Command::new("git")
206                .args([
207                    "-C",
208                    repo_path_str,
209                    "reset",
210                    "--hard",
211                    &format!("origin/{}", branch),
212                ])
213                .output()
214                .map_err(|e| Error::io_with_context("git reset", e.to_string()))?;
215
216            if !output.status.success() {
217                let stderr = String::from_utf8_lossy(&output.stderr);
218                return Err(Error::io_with_context("git reset", stderr.to_string()));
219            }
220        } else {
221            // Clone repository
222            debug!("Cloning Git repository: {}", self.config.location);
223            let url = if let Some(ref token) = self.config.auth_token {
224                Self::inject_auth_token(&self.config.location, token)?
225            } else {
226                self.config.location.clone()
227            };
228
229            let branch = self.config.branch.as_deref().unwrap_or("main");
230            let repo_path_str = self.repo_path.to_str().unwrap();
231
232            let output = Command::new("git")
233                .args([
234                    "clone",
235                    "--branch",
236                    branch,
237                    "--depth",
238                    "1",
239                    &url,
240                    repo_path_str,
241                ])
242                .output()
243                .map_err(|e| Error::io_with_context("git clone", e.to_string()))?;
244
245            if !output.status.success() {
246                let stderr = String::from_utf8_lossy(&output.stderr);
247                return Err(Error::io_with_context("git clone", stderr.to_string()));
248            }
249        }
250
251        Ok(())
252    }
253
254    /// Inject authentication token into URL
255    fn inject_auth_token(url: &str, token: &str) -> Result<String> {
256        if url.starts_with("https://") {
257            if let Some(rest) = url.strip_prefix("https://") {
258                return Ok(format!("https://{}@{}", token, rest));
259            }
260        }
261        if url.contains('@') {
262            warn!("SSH URL detected. Token authentication may not work.");
263        }
264        Ok(url.to_string())
265    }
266}
267
268#[async_trait::async_trait]
269impl DataSource for GitDataSource {
270    async fn load(&self) -> Result<DataSourceContent> {
271        // Ensure repository is cloned/updated
272        self.ensure_repo().await?;
273
274        // Determine file path
275        let file_path = if let Some(ref path) = self.config.path {
276            self.repo_path.join(path)
277        } else {
278            return Err(Error::config(
279                "Git data source requires a 'path' to specify the file within the repository"
280                    .to_string(),
281            ));
282        };
283
284        if !file_path.exists() {
285            return Err(Error::not_found(
286                "file in Git repository".to_string(),
287                file_path.display().to_string(),
288            ));
289        }
290
291        // Load file content
292        let content = tokio::fs::read(&file_path).await.map_err(|e| {
293            Error::io_with_context("reading file from Git repository", e.to_string())
294        })?;
295
296        let content_type =
297            file_path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
298                "json" => "application/json".to_string(),
299                "yaml" | "yml" => "application/x-yaml".to_string(),
300                _ => format!("text/{}", ext),
301            });
302
303        let mut metadata = HashMap::new();
304        metadata.insert("source".to_string(), "git".to_string());
305        metadata.insert("repository".to_string(), self.config.location.clone());
306        if let Some(ref branch) = self.config.branch {
307            metadata.insert("branch".to_string(), branch.clone());
308        }
309        metadata.insert("path".to_string(), file_path.display().to_string());
310
311        // Get commit hash
312        use std::process::Command;
313        if let Ok(output) = Command::new("git")
314            .args(["-C", self.repo_path.to_str().unwrap(), "rev-parse", "HEAD"])
315            .output()
316        {
317            if output.status.success() {
318                if let Ok(commit) = String::from_utf8(output.stdout) {
319                    metadata.insert("commit".to_string(), commit.trim().to_string());
320                }
321            }
322        }
323
324        Ok(DataSourceContent {
325            content,
326            content_type,
327            metadata,
328        })
329    }
330
331    async fn check_updated(&self) -> Result<bool> {
332        // Check if remote has new commits
333        use std::process::Command;
334
335        let branch = self.config.branch.as_deref().unwrap_or("main");
336        let repo_path_str = self.repo_path.to_str().unwrap();
337
338        // Fetch without updating
339        let _output = Command::new("git")
340            .args(["-C", repo_path_str, "fetch", "origin", branch])
341            .output();
342
343        // Compare local and remote
344        let output = Command::new("git")
345            .args([
346                "-C",
347                repo_path_str,
348                "rev-list",
349                "--count",
350                &format!("HEAD..origin/{}", branch),
351            ])
352            .output()
353            .map_err(|e| Error::io_with_context("checking git for updates", e.to_string()))?;
354
355        if output.status.success() {
356            if let Ok(count_str) = String::from_utf8(output.stdout) {
357                if let Ok(count) = count_str.trim().parse::<u32>() {
358                    return Ok(count > 0);
359                }
360            }
361        }
362
363        Ok(false)
364    }
365
366    fn source_type(&self) -> DataSourceType {
367        DataSourceType::Git
368    }
369}
370
371/// HTTP/HTTPS data source
372pub struct HttpDataSource {
373    url: String,
374    auth_token: Option<String>,
375    refresh_interval: Option<u64>,
376    last_fetch: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
377    cached_content: std::sync::Arc<std::sync::Mutex<Option<DataSourceContent>>>,
378}
379
380impl HttpDataSource {
381    /// Create a new HTTP data source
382    pub fn new(config: DataSourceConfig) -> Self {
383        Self {
384            url: config.location.clone(),
385            auth_token: config.auth_token.clone(),
386            refresh_interval: config.refresh_interval,
387            last_fetch: std::sync::Arc::new(std::sync::Mutex::new(None)),
388            cached_content: std::sync::Arc::new(std::sync::Mutex::new(None)),
389        }
390    }
391
392    /// Fetch data from HTTP endpoint
393    async fn fetch(&self) -> Result<DataSourceContent> {
394        let client = reqwest::Client::new();
395
396        // Add authentication if provided
397        let mut request = client.get(&self.url);
398        if let Some(ref token) = self.auth_token {
399            request = request.bearer_auth(token);
400        }
401
402        let response = request.send().await.map_err(|e| {
403            Error::internal(format!("Failed to fetch data from {}: {}", self.url, e))
404        })?;
405
406        // Extract status and content type before consuming the response
407        let status = response.status();
408        let status_code = status.as_u16();
409
410        if !status.is_success() {
411            return Err(Error::internal(format!("HTTP request failed with status {}", status)));
412        }
413
414        let content_type = response
415            .headers()
416            .get("content-type")
417            .and_then(|h| h.to_str().ok())
418            .map(|s| s.to_string());
419
420        let content = response
421            .bytes()
422            .await
423            .map_err(|e| Error::internal(format!("Failed to read response body: {}", e)))?
424            .to_vec();
425
426        let mut metadata = HashMap::new();
427        metadata.insert("source".to_string(), "http".to_string());
428        metadata.insert("url".to_string(), self.url.clone());
429        metadata.insert("status".to_string(), status_code.to_string());
430        if let Some(content_type) = &content_type {
431            metadata.insert("content_type".to_string(), content_type.clone());
432        }
433
434        Ok(DataSourceContent {
435            content,
436            content_type,
437            metadata,
438        })
439    }
440}
441
442#[async_trait::async_trait]
443impl DataSource for HttpDataSource {
444    async fn load(&self) -> Result<DataSourceContent> {
445        // Check if we should use cached content
446        {
447            let cached_guard =
448                self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
449            let last_fetch_guard =
450                self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
451
452            if let (Some(cached), Some(last_fetch), Some(refresh_interval)) =
453                (cached_guard.as_ref(), last_fetch_guard.as_ref(), self.refresh_interval)
454            {
455                if last_fetch.elapsed().as_secs() < refresh_interval {
456                    debug!("Using cached HTTP data");
457                    return Ok(cached.clone());
458                }
459            }
460        }
461
462        // Fetch fresh data
463        let content = self.fetch().await?;
464        {
465            let mut last_fetch =
466                self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
467            let mut cached =
468                self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
469            *last_fetch = Some(std::time::Instant::now());
470            *cached = Some(content.clone());
471        }
472
473        Ok(content)
474    }
475
476    async fn check_updated(&self) -> Result<bool> {
477        // For HTTP sources, we check if cache is expired
478        let last_fetch = self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
479        if let (Some(last_fetch), Some(refresh_interval)) =
480            (last_fetch.as_ref(), self.refresh_interval)
481        {
482            Ok(last_fetch.elapsed().as_secs() >= refresh_interval)
483        } else {
484            // No cache, always consider updated
485            Ok(true)
486        }
487    }
488
489    fn source_type(&self) -> DataSourceType {
490        DataSourceType::Http
491    }
492}
493
494/// Data source factory
495pub struct DataSourceFactory;
496
497impl DataSourceFactory {
498    /// Create a data source from configuration
499    pub fn create(config: DataSourceConfig) -> Result<Box<dyn DataSource + Send + Sync>> {
500        match config.source_type {
501            DataSourceType::Local => Ok(Box::new(LocalDataSource::new(&config.location))),
502            DataSourceType::Git => {
503                let git_source = GitDataSource::new(config)?;
504                Ok(Box::new(git_source))
505            }
506            DataSourceType::Http => Ok(Box::new(HttpDataSource::new(config))),
507        }
508    }
509}
510
511/// Data source manager for handling multiple sources
512pub struct DataSourceManager {
513    sources: HashMap<String, std::sync::Arc<dyn DataSource + Send + Sync>>,
514}
515
516impl DataSourceManager {
517    /// Create a new data source manager
518    pub fn new() -> Self {
519        Self {
520            sources: HashMap::new(),
521        }
522    }
523
524    /// Register a data source
525    pub fn register(&mut self, name: String, source: Box<dyn DataSource + Send + Sync>) {
526        self.sources.insert(name, std::sync::Arc::from(source));
527    }
528
529    /// Load data from a named source
530    pub async fn load(&self, name: &str) -> Result<DataSourceContent> {
531        let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
532
533        source.load().await
534    }
535
536    /// Check if a source has been updated
537    pub async fn check_updated(&self, name: &str) -> Result<bool> {
538        let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
539
540        source.check_updated().await
541    }
542
543    /// List all registered sources
544    pub fn list_sources(&self) -> Vec<String> {
545        self.sources.keys().cloned().collect()
546    }
547}
548
549impl Default for DataSourceManager {
550    fn default() -> Self {
551        Self::new()
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_local_data_source_creation() {
561        let source = LocalDataSource::new("./test.json");
562        assert_eq!(source.source_type(), DataSourceType::Local);
563    }
564
565    #[test]
566    fn test_git_data_source_config() {
567        let config = DataSourceConfig {
568            source_type: DataSourceType::Git,
569            location: "https://github.com/user/repo.git".to_string(),
570            branch: Some("main".to_string()),
571            auth_token: None,
572            path: Some("data/test.json".to_string()),
573            cache_dir: None,
574            refresh_interval: None,
575        };
576
577        let source = GitDataSource::new(config).unwrap();
578        assert_eq!(source.source_type(), DataSourceType::Git);
579    }
580
581    #[test]
582    fn test_http_data_source_config() {
583        let config = DataSourceConfig {
584            source_type: DataSourceType::Http,
585            location: "https://api.example.com/data.json".to_string(),
586            branch: None,
587            auth_token: Some("token123".to_string()),
588            path: None,
589            cache_dir: None,
590            refresh_interval: Some(60),
591        };
592
593        let source = HttpDataSource::new(config);
594        assert_eq!(source.source_type(), DataSourceType::Http);
595    }
596
597    #[test]
598    fn test_data_source_factory() {
599        let local_config = DataSourceConfig {
600            source_type: DataSourceType::Local,
601            location: "./test.json".to_string(),
602            branch: None,
603            auth_token: None,
604            path: None,
605            cache_dir: None,
606            refresh_interval: None,
607        };
608
609        let source = DataSourceFactory::create(local_config).unwrap();
610        assert_eq!(source.source_type(), DataSourceType::Local);
611    }
612}