use super::state::{
Checkpoint, CheckpointId, SessionConfig, SessionFilter, SessionId, SessionStatus,
SessionSummary, UnifiedSession,
};
use super::storage::SessionStorage;
use super::{checkpoints, filters, lifecycle, updates};
use crate::storage::GlobalStorage;
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub enum SessionUpdate {
Status(SessionStatus),
Metadata(HashMap<String, serde_json::Value>),
Checkpoint(serde_json::Value),
Error(String),
Progress {
current: usize,
total: usize,
},
Timing {
operation: String,
duration: std::time::Duration,
},
}
pub struct SessionManager {
storage: SessionStorage,
active_sessions: Arc<RwLock<HashMap<SessionId, UnifiedSession>>>,
}
impl SessionManager {
pub async fn new(storage: GlobalStorage) -> Result<Self> {
Ok(Self {
storage: SessionStorage::new(storage),
active_sessions: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn create_session(&self, config: SessionConfig) -> Result<SessionId> {
let mut session = match config.session_type {
super::state::SessionType::Workflow => {
let workflow_id = config
.workflow_id
.ok_or_else(|| anyhow!("Workflow ID required for workflow session"))?;
let workflow_name = config.workflow_name.unwrap_or_default();
UnifiedSession::new_workflow(workflow_id, workflow_name)
}
super::state::SessionType::MapReduce => {
let job_id = config
.job_id
.ok_or_else(|| anyhow!("Job ID required for MapReduce session"))?;
UnifiedSession::new_mapreduce(job_id, 0)
}
};
session.metadata = config.metadata;
let session_id = session.id.clone();
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.insert(session_id.clone(), session.clone());
}
self.storage.save(&session).await?;
Ok(session_id)
}
pub async fn load_session(&self, id: &SessionId) -> Result<UnifiedSession> {
{
let sessions = self
.active_sessions
.read()
.map_err(|e| anyhow!("Failed to acquire read lock on active sessions: {}", e))?;
if let Some(session) = sessions.get(id) {
return Ok(session.clone());
}
}
let session = self.storage.load(id).await?;
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.insert(id.clone(), session.clone());
}
Ok(session)
}
pub async fn update_session(&self, id: &SessionId, update: SessionUpdate) -> Result<()> {
let mut session = self.load_session(id).await?;
session.updated_at = chrono::Utc::now();
match update {
SessionUpdate::Status(status) => {
lifecycle::apply_status_update(&mut session, status)?;
}
SessionUpdate::Metadata(metadata) => {
updates::apply_metadata_update(&mut session, metadata);
}
SessionUpdate::Checkpoint(state) => {
updates::apply_checkpoint_update(&mut session, state);
}
SessionUpdate::Error(error) => {
updates::apply_error_update(&mut session, error);
}
SessionUpdate::Progress { current, total } => {
updates::apply_progress_update(&mut session, current, total);
}
SessionUpdate::Timing {
operation,
duration,
} => {
updates::apply_timing_update(&mut session, operation, duration);
}
}
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.insert(id.clone(), session.clone());
}
self.storage.save(&session).await
}
pub async fn delete_session(&self, id: &SessionId) -> Result<()> {
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.remove(id);
}
self.storage.delete(id).await
}
pub async fn start_session(&self, id: &SessionId) -> Result<()> {
self.update_session(id, SessionUpdate::Status(SessionStatus::Running))
.await
}
pub async fn pause_session(&self, id: &SessionId) -> Result<()> {
self.update_session(id, SessionUpdate::Status(SessionStatus::Paused))
.await
}
pub async fn resume_session(&self, id: &SessionId) -> Result<()> {
self.update_session(id, SessionUpdate::Status(SessionStatus::Running))
.await
}
pub async fn complete_session(&self, id: &SessionId, success: bool) -> Result<SessionSummary> {
let status = if success {
SessionStatus::Completed
} else {
SessionStatus::Failed
};
self.update_session(id, SessionUpdate::Status(status))
.await?;
let session = self.load_session(id).await?;
Ok(session.to_summary())
}
pub async fn create_checkpoint(&self, id: &SessionId) -> Result<CheckpointId> {
let mut session = self.load_session(id).await?;
let checkpoint = checkpoints::create_checkpoint_from_session(&session)?;
let checkpoint_id = checkpoint.id.clone();
session.checkpoints.push(checkpoint);
session.updated_at = chrono::Utc::now();
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.insert(id.clone(), session.clone());
}
self.storage.save(&session).await?;
Ok(checkpoint_id)
}
pub async fn restore_checkpoint(
&self,
id: &SessionId,
checkpoint_id: &CheckpointId,
) -> Result<()> {
let session = self.load_session(id).await?;
let checkpoint = checkpoints::find_checkpoint(&session.checkpoints, checkpoint_id)
.ok_or_else(|| anyhow!("Checkpoint not found"))?;
let restored_session = checkpoints::restore_session_from_checkpoint(checkpoint)?;
{
let mut sessions = self
.active_sessions
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on active sessions: {}", e))?;
sessions.insert(id.clone(), restored_session.clone());
}
self.storage.save(&restored_session).await
}
pub async fn list_checkpoints(&self, id: &SessionId) -> Result<Vec<Checkpoint>> {
let session = self.load_session(id).await?;
Ok(session.checkpoints)
}
pub async fn list_sessions(
&self,
filter: Option<SessionFilter>,
) -> Result<Vec<SessionSummary>> {
let sessions = self.storage.load_all().await?;
let filtered_sessions = if let Some(filter) = filter {
sessions
.into_iter()
.filter(|s| filters::apply_session_filter(s, &filter))
.collect()
} else {
sessions
};
let summaries: Vec<SessionSummary> =
filtered_sessions.iter().map(|s| s.to_summary()).collect();
Ok(summaries)
}
pub async fn get_active_sessions(&self) -> Result<Vec<SessionId>> {
let sessions = self
.active_sessions
.read()
.map_err(|e| anyhow!("Failed to acquire read lock on active sessions: {}", e))?;
Ok(sessions.keys().cloned().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
struct TestContext {
_temp_dir: TempDir,
manager: SessionManager,
}
impl TestContext {
async fn new() -> Result<Self> {
let _temp_dir = TempDir::new()?;
let storage = GlobalStorage::new_with_root(_temp_dir.path().to_path_buf())?;
let manager = SessionManager::new(storage).await?;
Ok(Self { _temp_dir, manager })
}
fn workflow_config(&self, id: &str) -> SessionConfig {
SessionConfig {
session_type: super::super::state::SessionType::Workflow,
workflow_id: Some(id.to_string()),
workflow_name: None,
job_id: None,
metadata: HashMap::new(),
}
}
fn mapreduce_config(&self, id: &str) -> SessionConfig {
SessionConfig {
session_type: super::super::state::SessionType::MapReduce,
workflow_name: None,
job_id: Some(id.to_string()),
workflow_id: None,
metadata: HashMap::new(),
}
}
}
#[tokio::test]
async fn test_create_workflow_session() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("test-workflow");
let session_id = ctx.manager.create_session(config).await?;
assert!(!session_id.as_str().is_empty());
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Initializing);
assert!(session.workflow_data.is_some());
assert!(session.mapreduce_data.is_none());
Ok(())
}
#[tokio::test]
async fn test_create_mapreduce_session() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.mapreduce_config("test-job");
let session_id = ctx.manager.create_session(config).await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Initializing);
assert!(session.workflow_data.is_none());
assert!(session.mapreduce_data.is_some());
Ok(())
}
#[tokio::test]
async fn test_create_session_missing_workflow_id() {
let ctx = TestContext::new().await.unwrap();
let config = SessionConfig {
session_type: super::super::state::SessionType::Workflow,
workflow_id: None,
workflow_name: None,
job_id: None,
metadata: HashMap::new(),
};
let result = ctx.manager.create_session(config).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Workflow ID required"));
}
#[tokio::test]
async fn test_create_session_missing_job_id() {
let ctx = TestContext::new().await.unwrap();
let config = SessionConfig {
session_type: super::super::state::SessionType::MapReduce,
workflow_id: None,
workflow_name: None,
job_id: None,
metadata: HashMap::new(),
};
let result = ctx.manager.create_session(config).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Job ID required"));
}
#[tokio::test]
async fn test_session_lifecycle() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("lifecycle-test");
let session_id = ctx.manager.create_session(config).await?;
ctx.manager.start_session(&session_id).await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Running);
ctx.manager.pause_session(&session_id).await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Paused);
ctx.manager.resume_session(&session_id).await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Running);
let summary = ctx.manager.complete_session(&session_id, true).await?;
assert_eq!(summary.status, SessionStatus::Completed);
assert!(summary.duration.is_some());
Ok(())
}
#[tokio::test]
async fn test_session_failure() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("failure-test");
let session_id = ctx.manager.create_session(config).await?;
ctx.manager.start_session(&session_id).await?;
let summary = ctx.manager.complete_session(&session_id, false).await?;
assert_eq!(summary.status, SessionStatus::Failed);
assert!(summary.duration.is_some());
Ok(())
}
#[tokio::test]
async fn test_update_metadata() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("metadata-test");
let session_id = ctx.manager.create_session(config).await?;
let mut metadata = HashMap::new();
metadata.insert("key1".to_string(), serde_json::json!("value1"));
metadata.insert("key2".to_string(), serde_json::json!(42));
ctx.manager
.update_session(&session_id, SessionUpdate::Metadata(metadata.clone()))
.await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(
session.metadata.get("key1"),
Some(&serde_json::json!("value1"))
);
assert_eq!(session.metadata.get("key2"), Some(&serde_json::json!(42)));
Ok(())
}
#[tokio::test]
async fn test_update_files_changed_delta() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("files-changed-test");
let session_id = ctx.manager.create_session(config).await?;
let mut metadata = HashMap::new();
metadata.insert("files_changed_delta".to_string(), serde_json::json!(5));
ctx.manager
.update_session(&session_id, SessionUpdate::Metadata(metadata))
.await?;
let session = ctx.manager.load_session(&session_id).await?;
if let Some(workflow) = &session.workflow_data {
assert_eq!(workflow.files_changed, 5);
} else {
panic!("Expected workflow data");
}
let mut metadata = HashMap::new();
metadata.insert("files_changed_delta".to_string(), serde_json::json!(3));
ctx.manager
.update_session(&session_id, SessionUpdate::Metadata(metadata))
.await?;
let session = ctx.manager.load_session(&session_id).await?;
if let Some(workflow) = &session.workflow_data {
assert_eq!(workflow.files_changed, 8);
}
Ok(())
}
#[tokio::test]
async fn test_update_error() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("error-test");
let session_id = ctx.manager.create_session(config).await?;
ctx.manager.start_session(&session_id).await?;
ctx.manager
.update_session(&session_id, SessionUpdate::Error("Test error".to_string()))
.await?;
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Failed);
assert_eq!(session.error, Some("Test error".to_string()));
Ok(())
}
#[tokio::test]
async fn test_update_progress() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("progress-test");
let session_id = ctx.manager.create_session(config).await?;
ctx.manager
.update_session(
&session_id,
SessionUpdate::Progress {
current: 3,
total: 10,
},
)
.await?;
let session = ctx.manager.load_session(&session_id).await?;
if let Some(workflow) = &session.workflow_data {
assert_eq!(workflow.current_step, 3);
assert_eq!(workflow.total_steps, 10);
} else {
panic!("Expected workflow data");
}
Ok(())
}
#[tokio::test]
async fn test_checkpoint_creation_and_restore() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("checkpoint-test");
let session_id = ctx.manager.create_session(config).await?;
ctx.manager.start_session(&session_id).await?;
let mut metadata = HashMap::new();
metadata.insert("test_key".to_string(), serde_json::json!("test_value"));
ctx.manager
.update_session(&session_id, SessionUpdate::Metadata(metadata))
.await?;
let checkpoint_id = ctx.manager.create_checkpoint(&session_id).await?;
assert!(!checkpoint_id.as_str().is_empty());
let session = ctx.manager.load_session(&session_id).await?;
assert_eq!(session.checkpoints.len(), 1);
assert_eq!(session.checkpoints[0].id, checkpoint_id);
ctx.manager.pause_session(&session_id).await?;
ctx.manager
.restore_checkpoint(&session_id, &checkpoint_id)
.await?;
let restored = ctx.manager.load_session(&session_id).await?;
assert_eq!(restored.status, SessionStatus::Running);
assert_eq!(
restored.metadata.get("test_key"),
Some(&serde_json::json!("test_value"))
);
Ok(())
}
#[tokio::test]
async fn test_delete_session() -> Result<()> {
let ctx = TestContext::new().await?;
let config = ctx.workflow_config("delete-test");
let session_id = ctx.manager.create_session(config).await?;
let _ = ctx.manager.load_session(&session_id).await?;
ctx.manager.delete_session(&session_id).await?;
let result = ctx.manager.load_session(&session_id).await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_list_sessions() -> Result<()> {
let ctx = TestContext::new().await?;
let id1 = ctx
.manager
.create_session(ctx.workflow_config("list-test-1"))
.await?;
let id2 = ctx
.manager
.create_session(ctx.workflow_config("list-test-2"))
.await?;
let id3 = ctx
.manager
.create_session(ctx.mapreduce_config("list-test-3"))
.await?;
ctx.manager.start_session(&id1).await?;
ctx.manager.complete_session(&id2, true).await?;
let all_sessions = ctx.manager.list_sessions(None).await?;
assert!(all_sessions.len() >= 3);
let running_filter = SessionFilter {
status: Some(SessionStatus::Running),
..Default::default()
};
let running = ctx.manager.list_sessions(Some(running_filter)).await?;
assert_eq!(running.len(), 1);
assert_eq!(running[0].id, id1);
let completed_filter = SessionFilter {
status: Some(SessionStatus::Completed),
..Default::default()
};
let completed = ctx.manager.list_sessions(Some(completed_filter)).await?;
assert_eq!(completed.len(), 1);
assert_eq!(completed[0].id, id2);
let workflow_filter = SessionFilter {
session_type: Some(super::super::state::SessionType::Workflow),
..Default::default()
};
let workflows = ctx.manager.list_sessions(Some(workflow_filter)).await?;
assert!(workflows.len() >= 2);
let mapreduce_filter = SessionFilter {
session_type: Some(super::super::state::SessionType::MapReduce),
..Default::default()
};
let mapreduce = ctx.manager.list_sessions(Some(mapreduce_filter)).await?;
assert!(!mapreduce.is_empty());
assert!(mapreduce.iter().any(|s| s.id == id3));
Ok(())
}
#[tokio::test]
async fn test_session_persistence() -> Result<()> {
let _temp_dir = TempDir::new()?;
let session_id;
{
let storage = GlobalStorage::new()?;
let manager = SessionManager::new(storage).await?;
let config = SessionConfig {
session_type: super::super::state::SessionType::Workflow,
workflow_id: Some("persistence-test".to_string()),
workflow_name: None,
job_id: None,
metadata: HashMap::new(),
};
session_id = manager.create_session(config).await?;
manager.start_session(&session_id).await?;
}
{
let storage = GlobalStorage::new()?;
let manager = SessionManager::new(storage).await?;
let session = manager.load_session(&session_id).await?;
assert_eq!(session.status, SessionStatus::Running);
}
Ok(())
}
}