use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::fs;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionJobMapping {
pub session_id: String,
pub job_id: String,
pub workflow_name: String,
pub created_at: DateTime<Utc>,
}
impl SessionJobMapping {
pub fn new(session_id: String, job_id: String, workflow_name: String) -> Self {
Self {
session_id,
job_id,
workflow_name,
created_at: Utc::now(),
}
}
pub async fn store(&self, storage_dir: &Path) -> Result<()> {
let mapping_dir = storage_dir.join("mappings");
fs::create_dir_all(&mapping_dir).await?;
let session_file = mapping_dir.join(format!("{}.json", self.session_id));
let job_file = mapping_dir.join(format!("{}.json", self.job_id));
let json = serde_json::to_vec_pretty(self)?;
fs::write(&session_file, &json).await?;
fs::write(&job_file, &json).await?;
Ok(())
}
pub async fn load_by_session(session_id: &str, storage_dir: &Path) -> Result<Option<Self>> {
let mapping_file = storage_dir
.join("mappings")
.join(format!("{}.json", session_id));
if !mapping_file.exists() {
return Ok(None);
}
let data = fs::read(&mapping_file).await?;
let mapping: Self =
serde_json::from_slice(&data).context("Failed to deserialize session-job mapping")?;
Ok(Some(mapping))
}
pub async fn load_by_job(job_id: &str, storage_dir: &Path) -> Result<Option<Self>> {
let mapping_file = storage_dir
.join("mappings")
.join(format!("{}.json", job_id));
if !mapping_file.exists() {
return Ok(None);
}
let data = fs::read(&mapping_file).await?;
let mapping: Self =
serde_json::from_slice(&data).context("Failed to deserialize session-job mapping")?;
Ok(Some(mapping))
}
pub async fn exists_for_session(session_id: &str, storage_dir: &Path) -> bool {
storage_dir
.join("mappings")
.join(format!("{}.json", session_id))
.exists()
}
pub async fn exists_for_job(job_id: &str, storage_dir: &Path) -> bool {
storage_dir
.join("mappings")
.join(format!("{}.json", job_id))
.exists()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_session_job_mapping_storage() {
let temp_dir = TempDir::new().unwrap();
let storage_dir = temp_dir.path();
let mapping = SessionJobMapping::new(
"session-123".to_string(),
"mapreduce-456".to_string(),
"test-workflow".to_string(),
);
mapping.store(storage_dir).await.unwrap();
let loaded_by_session = SessionJobMapping::load_by_session("session-123", storage_dir)
.await
.unwrap()
.unwrap();
assert_eq!(loaded_by_session.session_id, "session-123");
assert_eq!(loaded_by_session.job_id, "mapreduce-456");
assert_eq!(loaded_by_session.workflow_name, "test-workflow");
let loaded_by_job = SessionJobMapping::load_by_job("mapreduce-456", storage_dir)
.await
.unwrap()
.unwrap();
assert_eq!(loaded_by_job.session_id, "session-123");
assert_eq!(loaded_by_job.job_id, "mapreduce-456");
assert_eq!(loaded_by_job.workflow_name, "test-workflow");
}
#[tokio::test]
async fn test_mapping_existence_checks() {
let temp_dir = TempDir::new().unwrap();
let storage_dir = temp_dir.path();
assert!(!SessionJobMapping::exists_for_session("session-123", storage_dir).await);
assert!(!SessionJobMapping::exists_for_job("mapreduce-456", storage_dir).await);
let mapping = SessionJobMapping::new(
"session-123".to_string(),
"mapreduce-456".to_string(),
"test-workflow".to_string(),
);
mapping.store(storage_dir).await.unwrap();
assert!(SessionJobMapping::exists_for_session("session-123", storage_dir).await);
assert!(SessionJobMapping::exists_for_job("mapreduce-456", storage_dir).await);
}
#[tokio::test]
async fn test_load_nonexistent_mapping() {
let temp_dir = TempDir::new().unwrap();
let storage_dir = temp_dir.path();
let result = SessionJobMapping::load_by_session("nonexistent", storage_dir)
.await
.unwrap();
assert!(result.is_none());
let result = SessionJobMapping::load_by_job("nonexistent", storage_dir)
.await
.unwrap();
assert!(result.is_none());
}
}