use crate::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tracing::{debug, warn};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DataSourceType {
Local,
Git,
Http,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSourceConfig {
#[serde(rename = "type")]
pub source_type: DataSourceType,
pub location: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub branch: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub auth_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_dir: Option<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
pub refresh_interval: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct DataSourceContent {
pub content: Vec<u8>,
pub content_type: Option<String>,
pub metadata: HashMap<String, String>,
}
#[async_trait::async_trait]
pub trait DataSource: Send + Sync {
async fn load(&self) -> Result<DataSourceContent>;
async fn check_updated(&self) -> Result<bool>;
fn source_type(&self) -> DataSourceType;
}
pub struct LocalDataSource {
path: PathBuf,
}
impl LocalDataSource {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
}
}
}
#[async_trait::async_trait]
impl DataSource for LocalDataSource {
async fn load(&self) -> Result<DataSourceContent> {
debug!("Loading data from local file: {}", self.path.display());
let content = tokio::fs::read(&self.path).await.map_err(|e| {
Error::io_with_context(
format!("reading local file {}", self.path.display()),
e.to_string(),
)
})?;
let content_type =
self.path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
"json" => "application/json".to_string(),
"yaml" | "yml" => "application/x-yaml".to_string(),
"xml" => "application/xml".to_string(),
"csv" => "text/csv".to_string(),
_ => format!("text/{}", ext),
});
let mut metadata = HashMap::new();
if let Ok(metadata_info) = tokio::fs::metadata(&self.path).await {
metadata.insert("size".to_string(), metadata_info.len().to_string());
if let Ok(modified) = metadata_info.modified() {
metadata.insert("modified".to_string(), format!("{:?}", modified));
}
}
metadata.insert("path".to_string(), self.path.display().to_string());
Ok(DataSourceContent {
content,
content_type,
metadata,
})
}
async fn check_updated(&self) -> Result<bool> {
Ok(true)
}
fn source_type(&self) -> DataSourceType {
DataSourceType::Local
}
}
pub struct GitDataSource {
config: DataSourceConfig,
repo_path: PathBuf,
}
impl GitDataSource {
pub fn new(config: DataSourceConfig) -> Result<Self> {
let repo_name = Self::extract_repo_name(&config.location)?;
let cache_dir = config
.cache_dir
.clone()
.unwrap_or_else(|| PathBuf::from("./.mockforge-data-cache"));
let repo_path = cache_dir.join(repo_name);
Ok(Self { config, repo_path })
}
fn extract_repo_name(url: &str) -> Result<String> {
let name = if let Some(stripped) = url.strip_suffix(".git") {
stripped
} else {
url
};
let parts: Vec<&str> = name.split('/').collect();
if let Some(last) = parts.last() {
let clean = last.split('?').next().unwrap_or(last);
Ok(clean.to_string())
} else {
Err(Error::config(format!("Invalid Git repository URL: {}", url)))
}
}
async fn ensure_repo(&self) -> Result<()> {
use std::process::Command;
if let Some(parent) = self.repo_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Error::io_with_context("creating cache directory", e.to_string()))?;
}
if self.repo_path.exists() {
debug!("Updating Git repository: {}", self.repo_path.display());
let branch = self.config.branch.as_deref().unwrap_or("main");
let repo_path_str = self.repo_path.to_str().unwrap();
let output = Command::new("git")
.args(["-C", repo_path_str, "fetch", "origin", branch])
.output()
.map_err(|e| Error::io_with_context("git fetch", e.to_string()))?;
if !output.status.success() {
warn!("Git fetch failed, continuing anyway");
}
let output = Command::new("git")
.args([
"-C",
repo_path_str,
"reset",
"--hard",
&format!("origin/{}", branch),
])
.output()
.map_err(|e| Error::io_with_context("git reset", e.to_string()))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::io_with_context("git reset", stderr.to_string()));
}
} else {
debug!("Cloning Git repository: {}", self.config.location);
let url = if let Some(ref token) = self.config.auth_token {
Self::inject_auth_token(&self.config.location, token)?
} else {
self.config.location.clone()
};
let branch = self.config.branch.as_deref().unwrap_or("main");
let repo_path_str = self.repo_path.to_str().unwrap();
let output = Command::new("git")
.args([
"clone",
"--branch",
branch,
"--depth",
"1",
&url,
repo_path_str,
])
.output()
.map_err(|e| Error::io_with_context("git clone", e.to_string()))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::io_with_context("git clone", stderr.to_string()));
}
}
Ok(())
}
fn inject_auth_token(url: &str, token: &str) -> Result<String> {
if url.starts_with("https://") {
if let Some(rest) = url.strip_prefix("https://") {
return Ok(format!("https://{}@{}", token, rest));
}
}
if url.contains('@') {
warn!("SSH URL detected. Token authentication may not work.");
}
Ok(url.to_string())
}
}
#[async_trait::async_trait]
impl DataSource for GitDataSource {
async fn load(&self) -> Result<DataSourceContent> {
self.ensure_repo().await?;
let file_path = if let Some(ref path) = self.config.path {
self.repo_path.join(path)
} else {
return Err(Error::config(
"Git data source requires a 'path' to specify the file within the repository"
.to_string(),
));
};
if !file_path.exists() {
return Err(Error::not_found(
"file in Git repository".to_string(),
file_path.display().to_string(),
));
}
let content = tokio::fs::read(&file_path).await.map_err(|e| {
Error::io_with_context("reading file from Git repository", e.to_string())
})?;
let content_type =
file_path.extension().and_then(|ext| ext.to_str()).map(|ext| match ext {
"json" => "application/json".to_string(),
"yaml" | "yml" => "application/x-yaml".to_string(),
_ => format!("text/{}", ext),
});
let mut metadata = HashMap::new();
metadata.insert("source".to_string(), "git".to_string());
metadata.insert("repository".to_string(), self.config.location.clone());
if let Some(ref branch) = self.config.branch {
metadata.insert("branch".to_string(), branch.clone());
}
metadata.insert("path".to_string(), file_path.display().to_string());
use std::process::Command;
if let Ok(output) = Command::new("git")
.args(["-C", self.repo_path.to_str().unwrap(), "rev-parse", "HEAD"])
.output()
{
if output.status.success() {
if let Ok(commit) = String::from_utf8(output.stdout) {
metadata.insert("commit".to_string(), commit.trim().to_string());
}
}
}
Ok(DataSourceContent {
content,
content_type,
metadata,
})
}
async fn check_updated(&self) -> Result<bool> {
use std::process::Command;
let branch = self.config.branch.as_deref().unwrap_or("main");
let repo_path_str = self.repo_path.to_str().unwrap();
let _output = Command::new("git")
.args(["-C", repo_path_str, "fetch", "origin", branch])
.output();
let output = Command::new("git")
.args([
"-C",
repo_path_str,
"rev-list",
"--count",
&format!("HEAD..origin/{}", branch),
])
.output()
.map_err(|e| Error::io_with_context("checking git for updates", e.to_string()))?;
if output.status.success() {
if let Ok(count_str) = String::from_utf8(output.stdout) {
if let Ok(count) = count_str.trim().parse::<u32>() {
return Ok(count > 0);
}
}
}
Ok(false)
}
fn source_type(&self) -> DataSourceType {
DataSourceType::Git
}
}
pub struct HttpDataSource {
url: String,
auth_token: Option<String>,
refresh_interval: Option<u64>,
last_fetch: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
cached_content: std::sync::Arc<std::sync::Mutex<Option<DataSourceContent>>>,
}
impl HttpDataSource {
pub fn new(config: DataSourceConfig) -> Self {
Self {
url: config.location.clone(),
auth_token: config.auth_token.clone(),
refresh_interval: config.refresh_interval,
last_fetch: std::sync::Arc::new(std::sync::Mutex::new(None)),
cached_content: std::sync::Arc::new(std::sync::Mutex::new(None)),
}
}
async fn fetch(&self) -> Result<DataSourceContent> {
let client = reqwest::Client::new();
let mut request = client.get(&self.url);
if let Some(ref token) = self.auth_token {
request = request.bearer_auth(token);
}
let response = request.send().await.map_err(|e| {
Error::internal(format!("Failed to fetch data from {}: {}", self.url, e))
})?;
let status = response.status();
let status_code = status.as_u16();
if !status.is_success() {
return Err(Error::internal(format!("HTTP request failed with status {}", status)));
}
let content_type = response
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let content = response
.bytes()
.await
.map_err(|e| Error::internal(format!("Failed to read response body: {}", e)))?
.to_vec();
let mut metadata = HashMap::new();
metadata.insert("source".to_string(), "http".to_string());
metadata.insert("url".to_string(), self.url.clone());
metadata.insert("status".to_string(), status_code.to_string());
if let Some(content_type) = &content_type {
metadata.insert("content_type".to_string(), content_type.clone());
}
Ok(DataSourceContent {
content,
content_type,
metadata,
})
}
}
#[async_trait::async_trait]
impl DataSource for HttpDataSource {
async fn load(&self) -> Result<DataSourceContent> {
{
let cached_guard =
self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
let last_fetch_guard =
self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
if let (Some(cached), Some(last_fetch), Some(refresh_interval)) =
(cached_guard.as_ref(), last_fetch_guard.as_ref(), self.refresh_interval)
{
if last_fetch.elapsed().as_secs() < refresh_interval {
debug!("Using cached HTTP data");
return Ok(cached.clone());
}
}
}
let content = self.fetch().await?;
{
let mut last_fetch =
self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
let mut cached =
self.cached_content.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
*last_fetch = Some(std::time::Instant::now());
*cached = Some(content.clone());
}
Ok(content)
}
async fn check_updated(&self) -> Result<bool> {
let last_fetch = self.last_fetch.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
if let (Some(last_fetch), Some(refresh_interval)) =
(last_fetch.as_ref(), self.refresh_interval)
{
Ok(last_fetch.elapsed().as_secs() >= refresh_interval)
} else {
Ok(true)
}
}
fn source_type(&self) -> DataSourceType {
DataSourceType::Http
}
}
pub struct DataSourceFactory;
impl DataSourceFactory {
pub fn create(config: DataSourceConfig) -> Result<Box<dyn DataSource + Send + Sync>> {
match config.source_type {
DataSourceType::Local => Ok(Box::new(LocalDataSource::new(&config.location))),
DataSourceType::Git => {
let git_source = GitDataSource::new(config)?;
Ok(Box::new(git_source))
}
DataSourceType::Http => Ok(Box::new(HttpDataSource::new(config))),
}
}
}
pub struct DataSourceManager {
sources: HashMap<String, std::sync::Arc<dyn DataSource + Send + Sync>>,
}
impl DataSourceManager {
pub fn new() -> Self {
Self {
sources: HashMap::new(),
}
}
pub fn register(&mut self, name: String, source: Box<dyn DataSource + Send + Sync>) {
self.sources.insert(name, std::sync::Arc::from(source));
}
pub async fn load(&self, name: &str) -> Result<DataSourceContent> {
let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
source.load().await
}
pub async fn check_updated(&self, name: &str) -> Result<bool> {
let source = self.sources.get(name).ok_or_else(|| Error::not_found("data source", name))?;
source.check_updated().await
}
pub fn list_sources(&self) -> Vec<String> {
self.sources.keys().cloned().collect()
}
}
impl Default for DataSourceManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_local_data_source_creation() {
let source = LocalDataSource::new("./test.json");
assert_eq!(source.source_type(), DataSourceType::Local);
}
#[test]
fn test_git_data_source_config() {
let config = DataSourceConfig {
source_type: DataSourceType::Git,
location: "https://github.com/user/repo.git".to_string(),
branch: Some("main".to_string()),
auth_token: None,
path: Some("data/test.json".to_string()),
cache_dir: None,
refresh_interval: None,
};
let source = GitDataSource::new(config).unwrap();
assert_eq!(source.source_type(), DataSourceType::Git);
}
#[test]
fn test_http_data_source_config() {
let config = DataSourceConfig {
source_type: DataSourceType::Http,
location: "https://api.example.com/data.json".to_string(),
branch: None,
auth_token: Some("token123".to_string()),
path: None,
cache_dir: None,
refresh_interval: Some(60),
};
let source = HttpDataSource::new(config);
assert_eq!(source.source_type(), DataSourceType::Http);
}
#[test]
fn test_data_source_factory() {
let local_config = DataSourceConfig {
source_type: DataSourceType::Local,
location: "./test.json".to_string(),
branch: None,
auth_token: None,
path: None,
cache_dir: None,
refresh_interval: None,
};
let source = DataSourceFactory::create(local_config).unwrap();
assert_eq!(source.source_type(), DataSourceType::Local);
}
}