1use crate::{Error, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use tracing::{debug, error, info, warn};
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "lowercase")]
19pub enum DataSourceType {
20 Local,
22 Git,
24 Http,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct DataSourceConfig {
31 #[serde(rename = "type")]
33 pub source_type: DataSourceType,
34 pub location: String,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub branch: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub auth_token: Option<String>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub path: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub cache_dir: Option<PathBuf>,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub refresh_interval: Option<u64>,
51}
52
53#[derive(Debug, Clone)]
55pub struct DataSourceContent {
56 pub content: Vec<u8>,
58 pub content_type: Option<String>,
60 pub metadata: HashMap<String, String>,
62}
63
64#[async_trait::async_trait]
66pub trait DataSource: Send + Sync {
67 async fn load(&self) -> Result<DataSourceContent>;
69
70 async fn check_updated(&self) -> Result<bool>;
72
73 fn source_type(&self) -> DataSourceType;
75}
76
77pub struct LocalDataSource {
79 path: PathBuf,
80}
81
82impl LocalDataSource {
83 pub fn new(path: impl AsRef<Path>) -> Self {
85 Self {
86 path: path.as_ref().to_path_buf(),
87 }
88 }
89}
90
91#[async_trait::async_trait]
92impl DataSource for LocalDataSource {
93 async fn load(&self) -> Result<DataSourceContent> {
94 debug!("Loading data from local file: {}", self.path.display());
95
96 let content = tokio::fs::read(&self.path).await.map_err(|e| {
97 Error::generic(format!("Failed to read local file {}: {}", self.path.display(), e))
98 })?;
99
100 let content_type =
101 self.path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
102 "json" => "application/json".to_string(),
103 "yaml" | "yml" => "application/x-yaml".to_string(),
104 "xml" => "application/xml".to_string(),
105 "csv" => "text/csv".to_string(),
106 _ => format!("text/{}", ext),
107 });
108
109 let mut metadata = HashMap::new();
110 if let Ok(metadata_info) = tokio::fs::metadata(&self.path).await {
111 metadata.insert("size".to_string(), metadata_info.len().to_string());
112 if let Ok(modified) = metadata_info.modified() {
113 metadata.insert("modified".to_string(), format!("{:?}", modified));
114 }
115 }
116 metadata.insert("path".to_string(), self.path.display().to_string());
117
118 Ok(DataSourceContent {
119 content,
120 content_type,
121 metadata,
122 })
123 }
124
125 async fn check_updated(&self) -> Result<bool> {
126 Ok(true)
130 }
131
132 fn source_type(&self) -> DataSourceType {
133 DataSourceType::Local
134 }
135}
136
137pub struct GitDataSource {
139 config: DataSourceConfig,
140 repo_path: PathBuf,
141}
142
143impl GitDataSource {
144 pub fn new(config: DataSourceConfig) -> Result<Self> {
146 let repo_name = Self::extract_repo_name(&config.location)?;
148 let cache_dir = config
149 .cache_dir
150 .clone()
151 .unwrap_or_else(|| PathBuf::from("./.mockforge-data-cache"));
152 let repo_path = cache_dir.join(repo_name);
153
154 Ok(Self { config, repo_path })
155 }
156
157 fn extract_repo_name(url: &str) -> Result<String> {
159 let name = if url.ends_with(".git") {
160 &url[..url.len() - 4]
161 } else {
162 url
163 };
164
165 let parts: Vec<&str> = name.split('/').collect();
166 if let Some(last) = parts.last() {
167 let clean = last.split('?').next().unwrap_or(last);
168 Ok(clean.to_string())
169 } else {
170 Err(Error::generic(format!("Invalid Git repository URL: {}", url)))
171 }
172 }
173
174 async fn ensure_repo(&self) -> Result<()> {
176 use std::process::Command;
177
178 if let Some(parent) = self.repo_path.parent() {
180 tokio::fs::create_dir_all(parent)
181 .await
182 .map_err(|e| Error::generic(format!("Failed to create cache directory: {}", e)))?;
183 }
184
185 if self.repo_path.exists() {
186 debug!("Updating Git repository: {}", self.repo_path.display());
188 let branch = self.config.branch.as_deref().unwrap_or("main");
189 let repo_path_str = self.repo_path.to_str().unwrap();
190
191 let output = Command::new("git")
193 .args(["-C", repo_path_str, "fetch", "origin", branch])
194 .output()
195 .map_err(|e| Error::generic(format!("Failed to fetch: {}", e)))?;
196
197 if !output.status.success() {
198 warn!("Git fetch failed, continuing anyway");
199 }
200
201 let output = Command::new("git")
203 .args([
204 "-C",
205 repo_path_str,
206 "reset",
207 "--hard",
208 &format!("origin/{}", branch),
209 ])
210 .output()
211 .map_err(|e| Error::generic(format!("Failed to reset: {}", e)))?;
212
213 if !output.status.success() {
214 let stderr = String::from_utf8_lossy(&output.stderr);
215 return Err(Error::generic(format!("Git reset failed: {}", stderr)));
216 }
217 } else {
218 debug!("Cloning Git repository: {}", self.config.location);
220 let url = if let Some(ref token) = self.config.auth_token {
221 Self::inject_auth_token(&self.config.location, token)?
222 } else {
223 self.config.location.clone()
224 };
225
226 let branch = self.config.branch.as_deref().unwrap_or("main");
227 let repo_path_str = self.repo_path.to_str().unwrap();
228
229 let output = Command::new("git")
230 .args([
231 "clone",
232 "--branch",
233 branch,
234 "--depth",
235 "1",
236 &url,
237 repo_path_str,
238 ])
239 .output()
240 .map_err(|e| Error::generic(format!("Failed to clone: {}", e)))?;
241
242 if !output.status.success() {
243 let stderr = String::from_utf8_lossy(&output.stderr);
244 return Err(Error::generic(format!("Git clone failed: {}", stderr)));
245 }
246 }
247
248 Ok(())
249 }
250
251 fn inject_auth_token(url: &str, token: &str) -> Result<String> {
253 if url.starts_with("https://") {
254 if let Some(rest) = url.strip_prefix("https://") {
255 return Ok(format!("https://{}@{}", token, rest));
256 }
257 }
258 if url.contains('@') {
259 warn!("SSH URL detected. Token authentication may not work.");
260 }
261 Ok(url.to_string())
262 }
263}
264
265#[async_trait::async_trait]
266impl DataSource for GitDataSource {
267 async fn load(&self) -> Result<DataSourceContent> {
268 self.ensure_repo().await?;
270
271 let file_path = if let Some(ref path) = self.config.path {
273 self.repo_path.join(path)
274 } else {
275 return Err(Error::generic(
276 "Git data source requires a 'path' to specify the file within the repository"
277 .to_string(),
278 ));
279 };
280
281 if !file_path.exists() {
282 return Err(Error::generic(format!(
283 "File not found in Git repository: {}",
284 file_path.display()
285 )));
286 }
287
288 let content = tokio::fs::read(&file_path).await.map_err(|e| {
290 Error::generic(format!("Failed to read file from Git repository: {}", e))
291 })?;
292
293 let content_type =
294 file_path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
295 "json" => "application/json".to_string(),
296 "yaml" | "yml" => "application/x-yaml".to_string(),
297 _ => format!("text/{}", ext),
298 });
299
300 let mut metadata = HashMap::new();
301 metadata.insert("source".to_string(), "git".to_string());
302 metadata.insert("repository".to_string(), self.config.location.clone());
303 if let Some(ref branch) = self.config.branch {
304 metadata.insert("branch".to_string(), branch.clone());
305 }
306 metadata.insert("path".to_string(), file_path.display().to_string());
307
308 use std::process::Command;
310 if let Ok(output) = Command::new("git")
311 .args(["-C", self.repo_path.to_str().unwrap(), "rev-parse", "HEAD"])
312 .output()
313 {
314 if output.status.success() {
315 if let Ok(commit) = String::from_utf8(output.stdout) {
316 metadata.insert("commit".to_string(), commit.trim().to_string());
317 }
318 }
319 }
320
321 Ok(DataSourceContent {
322 content,
323 content_type,
324 metadata,
325 })
326 }
327
328 async fn check_updated(&self) -> Result<bool> {
329 use std::process::Command;
331
332 let branch = self.config.branch.as_deref().unwrap_or("main");
333 let repo_path_str = self.repo_path.to_str().unwrap();
334
335 let _output = Command::new("git")
337 .args(["-C", repo_path_str, "fetch", "origin", branch])
338 .output();
339
340 let output = Command::new("git")
342 .args([
343 "-C",
344 repo_path_str,
345 "rev-list",
346 "--count",
347 &format!("HEAD..origin/{}", branch),
348 ])
349 .output()
350 .map_err(|e| Error::generic(format!("Failed to check for updates: {}", e)))?;
351
352 if output.status.success() {
353 if let Ok(count_str) = String::from_utf8(output.stdout) {
354 if let Ok(count) = count_str.trim().parse::<u32>() {
355 return Ok(count > 0);
356 }
357 }
358 }
359
360 Ok(false)
361 }
362
363 fn source_type(&self) -> DataSourceType {
364 DataSourceType::Git
365 }
366}
367
368pub struct HttpDataSource {
370 url: String,
371 auth_token: Option<String>,
372 refresh_interval: Option<u64>,
373 last_fetch: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
374 cached_content: std::sync::Arc<std::sync::Mutex<Option<DataSourceContent>>>,
375}
376
377impl HttpDataSource {
378 pub fn new(config: DataSourceConfig) -> Self {
380 Self {
381 url: config.location.clone(),
382 auth_token: config.auth_token.clone(),
383 refresh_interval: config.refresh_interval,
384 last_fetch: std::sync::Arc::new(std::sync::Mutex::new(None)),
385 cached_content: std::sync::Arc::new(std::sync::Mutex::new(None)),
386 }
387 }
388
389 async fn fetch(&self) -> Result<DataSourceContent> {
391 let client = reqwest::Client::new();
392
393 let mut request = client.get(&self.url);
395 if let Some(ref token) = self.auth_token {
396 request = request.bearer_auth(token);
397 }
398
399 let response = request.send().await.map_err(|e| {
400 Error::generic(format!("Failed to fetch data from {}: {}", self.url, e))
401 })?;
402
403 let status = response.status();
405 let status_code = status.as_u16();
406
407 if !status.is_success() {
408 return Err(Error::generic(format!("HTTP request failed with status {}", status)));
409 }
410
411 let content_type = response
412 .headers()
413 .get("content-type")
414 .and_then(|h| h.to_str().ok())
415 .map(|s| s.to_string());
416
417 let content = response
418 .bytes()
419 .await
420 .map_err(|e| Error::generic(format!("Failed to read response body: {}", e)))?
421 .to_vec();
422
423 let mut metadata = HashMap::new();
424 metadata.insert("source".to_string(), "http".to_string());
425 metadata.insert("url".to_string(), self.url.clone());
426 metadata.insert("status".to_string(), status_code.to_string());
427 if let Some(content_type) = &content_type {
428 metadata.insert("content_type".to_string(), content_type.clone());
429 }
430
431 Ok(DataSourceContent {
432 content,
433 content_type,
434 metadata,
435 })
436 }
437}
438
439#[async_trait::async_trait]
440impl DataSource for HttpDataSource {
441 async fn load(&self) -> Result<DataSourceContent> {
442 {
444 let cached_guard = self.cached_content.lock().unwrap();
445 let last_fetch_guard = self.last_fetch.lock().unwrap();
446
447 if let (Some(ref cached), Some(ref last_fetch), Some(refresh_interval)) =
448 (cached_guard.as_ref(), last_fetch_guard.as_ref(), self.refresh_interval)
449 {
450 if last_fetch.elapsed().as_secs() < refresh_interval {
451 debug!("Using cached HTTP data");
452 return Ok((*cached).clone());
453 }
454 }
455 }
456
457 let content = self.fetch().await?;
459 {
460 let mut last_fetch = self.last_fetch.lock().unwrap();
461 let mut cached = self.cached_content.lock().unwrap();
462 *last_fetch = Some(std::time::Instant::now());
463 *cached = Some(content.clone());
464 }
465
466 Ok(content)
467 }
468
469 async fn check_updated(&self) -> Result<bool> {
470 let last_fetch = self.last_fetch.lock().unwrap();
472 if let (Some(ref last_fetch), Some(refresh_interval)) =
473 (last_fetch.as_ref(), self.refresh_interval)
474 {
475 Ok(last_fetch.elapsed().as_secs() >= refresh_interval)
476 } else {
477 Ok(true)
479 }
480 }
481
482 fn source_type(&self) -> DataSourceType {
483 DataSourceType::Http
484 }
485}
486
487pub struct DataSourceFactory;
489
490impl DataSourceFactory {
491 pub fn create(config: DataSourceConfig) -> Result<Box<dyn DataSource + Send + Sync>> {
493 match config.source_type {
494 DataSourceType::Local => Ok(Box::new(LocalDataSource::new(&config.location))),
495 DataSourceType::Git => {
496 let git_source = GitDataSource::new(config)?;
497 Ok(Box::new(git_source))
498 }
499 DataSourceType::Http => Ok(Box::new(HttpDataSource::new(config))),
500 }
501 }
502}
503
504pub struct DataSourceManager {
506 sources: HashMap<String, std::sync::Arc<dyn DataSource + Send + Sync>>,
507}
508
509impl DataSourceManager {
510 pub fn new() -> Self {
512 Self {
513 sources: HashMap::new(),
514 }
515 }
516
517 pub fn register(&mut self, name: String, source: Box<dyn DataSource + Send + Sync>) {
519 self.sources.insert(name, std::sync::Arc::from(source));
520 }
521
522 pub async fn load(&self, name: &str) -> Result<DataSourceContent> {
524 let source = self
525 .sources
526 .get(name)
527 .ok_or_else(|| Error::generic(format!("Data source '{}' not found", name)))?;
528
529 source.load().await
530 }
531
532 pub async fn check_updated(&self, name: &str) -> Result<bool> {
534 if let Some(_source) = self.sources.get(name) {
537 Ok(true)
540 } else {
541 Err(Error::generic(format!("Data source '{}' not found", name)))
542 }
543 }
544
545 pub fn list_sources(&self) -> Vec<String> {
547 self.sources.keys().cloned().collect()
548 }
549}
550
551impl Default for DataSourceManager {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_local_data_source_creation() {
563 let source = LocalDataSource::new("./test.json");
564 assert_eq!(source.source_type(), DataSourceType::Local);
565 }
566
567 #[test]
568 fn test_git_data_source_config() {
569 let config = DataSourceConfig {
570 source_type: DataSourceType::Git,
571 location: "https://github.com/user/repo.git".to_string(),
572 branch: Some("main".to_string()),
573 auth_token: None,
574 path: Some("data/test.json".to_string()),
575 cache_dir: None,
576 refresh_interval: None,
577 };
578
579 let source = GitDataSource::new(config).unwrap();
580 assert_eq!(source.source_type(), DataSourceType::Git);
581 }
582
583 #[test]
584 fn test_http_data_source_config() {
585 let config = DataSourceConfig {
586 source_type: DataSourceType::Http,
587 location: "https://api.example.com/data.json".to_string(),
588 branch: None,
589 auth_token: Some("token123".to_string()),
590 path: None,
591 cache_dir: None,
592 refresh_interval: Some(60),
593 };
594
595 let source = HttpDataSource::new(config);
596 assert_eq!(source.source_type(), DataSourceType::Http);
597 }
598
599 #[test]
600 fn test_data_source_factory() {
601 let local_config = DataSourceConfig {
602 source_type: DataSourceType::Local,
603 location: "./test.json".to_string(),
604 branch: None,
605 auth_token: None,
606 path: None,
607 cache_dir: None,
608 refresh_interval: None,
609 };
610
611 let source = DataSourceFactory::create(local_config).unwrap();
612 assert_eq!(source.source_type(), DataSourceType::Local);
613 }
614}