use super::validation::{validate_status_transition, SessionTransitionError};
use crate::unified_session::{SessionStatus, UnifiedSession};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq)]
pub enum SessionUpdateError {
InvalidTransition(SessionTransitionError),
MissingSessionData { session_type: String },
}
impl std::fmt::Display for SessionUpdateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SessionUpdateError::InvalidTransition(err) => write!(f, "{}", err),
SessionUpdateError::MissingSessionData { session_type } => {
write!(f, "Missing {} session data", session_type)
}
}
}
}
impl std::error::Error for SessionUpdateError {}
impl From<SessionTransitionError> for SessionUpdateError {
fn from(err: SessionTransitionError) -> Self {
SessionUpdateError::InvalidTransition(err)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SessionUpdate {
Status(SessionStatus),
Progress(ProgressUpdate),
Variables(HashMap<String, Value>),
AddStep(StepRecord),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProgressUpdate {
pub completed_steps: usize,
pub failed_steps: usize,
pub current_step: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepRecord {
pub command: String,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub status: String,
pub output: Option<String>,
}
impl StepRecord {
pub fn started(command: impl Into<String>) -> Self {
Self {
command: command.into(),
started_at: Utc::now(),
completed_at: None,
status: "running".to_string(),
output: None,
}
}
pub fn complete(self, output: Option<String>) -> Self {
Self {
completed_at: Some(Utc::now()),
status: "completed".to_string(),
output,
..self
}
}
pub fn fail(self, error: impl Into<String>) -> Self {
Self {
completed_at: Some(Utc::now()),
status: "failed".to_string(),
output: Some(error.into()),
..self
}
}
}
pub fn apply_session_update(
session: UnifiedSession,
update: SessionUpdate,
) -> Result<UnifiedSession, SessionUpdateError> {
let updated = UnifiedSession {
updated_at: Utc::now(),
..session
};
match update {
SessionUpdate::Status(status) => apply_status_update(updated, status),
SessionUpdate::Progress(progress) => apply_progress_update(updated, progress),
SessionUpdate::Variables(vars) => apply_variable_update(updated, vars),
SessionUpdate::AddStep(step) => apply_add_step(updated, step),
}
}
pub fn apply_status_update(
session: UnifiedSession,
status: SessionStatus,
) -> Result<UnifiedSession, SessionUpdateError> {
validate_status_transition(&session.status, &status)?;
let completed_at = match &status {
SessionStatus::Completed | SessionStatus::Failed | SessionStatus::Cancelled => {
Some(Utc::now())
}
_ => session.completed_at,
};
Ok(UnifiedSession {
status,
completed_at,
..session
})
}
pub fn apply_progress_update(
session: UnifiedSession,
progress: ProgressUpdate,
) -> Result<UnifiedSession, SessionUpdateError> {
let workflow_data = session.workflow_data.map(|mut wd| {
for _ in 0..progress.completed_steps {
if wd.current_step < wd.total_steps {
wd.completed_steps.push(wd.current_step);
wd.current_step += 1;
}
}
wd
});
let mapreduce_data = session.mapreduce_data.map(|mut md| {
md.processed_items = md.processed_items.saturating_add(progress.completed_steps);
md.failed_items = md.failed_items.saturating_add(progress.failed_steps);
md
});
let metadata = if let Some(ref step) = progress.current_step {
let mut new_metadata = session.metadata.clone();
new_metadata.insert("current_step".to_string(), Value::String(step.clone()));
new_metadata
} else {
session.metadata.clone()
};
Ok(UnifiedSession {
workflow_data,
mapreduce_data,
metadata,
..session
})
}
pub fn apply_variable_update(
session: UnifiedSession,
new_vars: HashMap<String, Value>,
) -> Result<UnifiedSession, SessionUpdateError> {
let mut metadata = session.metadata.clone();
metadata.extend(new_vars);
let workflow_data = session.workflow_data.map(|mut wd| {
for (key, value) in metadata.iter() {
if let Some(s) = value.as_str() {
wd.variables.insert(key.clone(), s.to_string());
}
}
wd
});
Ok(UnifiedSession {
metadata,
workflow_data,
..session
})
}
pub fn apply_add_step(
session: UnifiedSession,
step: StepRecord,
) -> Result<UnifiedSession, SessionUpdateError> {
let mut metadata = session.metadata.clone();
let steps_key = "execution_steps".to_string();
let mut steps: Vec<Value> = metadata
.get(&steps_key)
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let step_value = serde_json::to_value(&step).unwrap_or(Value::Null);
steps.push(step_value);
metadata.insert(steps_key, Value::Array(steps));
Ok(UnifiedSession {
metadata,
..session
})
}
pub fn apply_updates(
session: UnifiedSession,
updates: Vec<SessionUpdate>,
) -> Result<UnifiedSession, SessionUpdateError> {
updates.into_iter().try_fold(session, apply_session_update)
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_session() -> UnifiedSession {
let mut session =
UnifiedSession::new_workflow("test-workflow".to_string(), "test".to_string());
if let Some(ref mut wd) = session.workflow_data {
wd.total_steps = 10;
}
session
}
fn create_test_mapreduce_session() -> UnifiedSession {
UnifiedSession::new_mapreduce("test-job".to_string(), 100)
}
#[test]
fn test_apply_status_update_initializing_to_running() {
let session = create_test_session();
assert_eq!(session.status, SessionStatus::Initializing);
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Running));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Running);
assert!(updated.completed_at.is_none());
}
#[test]
fn test_apply_status_update_running_to_completed() {
let mut session = create_test_session();
session.status = SessionStatus::Running;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Completed));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Completed);
assert!(updated.completed_at.is_some());
}
#[test]
fn test_apply_status_update_running_to_failed() {
let mut session = create_test_session();
session.status = SessionStatus::Running;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Failed));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Failed);
assert!(updated.completed_at.is_some());
}
#[test]
fn test_apply_status_update_running_to_paused() {
let mut session = create_test_session();
session.status = SessionStatus::Running;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Paused));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Paused);
assert!(updated.completed_at.is_none());
}
#[test]
fn test_apply_status_update_paused_to_running() {
let mut session = create_test_session();
session.status = SessionStatus::Paused;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Running));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Running);
}
#[test]
fn test_apply_status_update_paused_to_cancelled() {
let mut session = create_test_session();
session.status = SessionStatus::Paused;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Cancelled));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Cancelled);
assert!(updated.completed_at.is_some());
}
#[test]
fn test_apply_status_update_invalid_completed_to_running() {
let mut session = create_test_session();
session.status = SessionStatus::Completed;
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Running));
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, SessionUpdateError::InvalidTransition(_)));
}
#[test]
fn test_apply_status_update_invalid_initializing_to_completed() {
let session = create_test_session();
let result = apply_session_update(session, SessionUpdate::Status(SessionStatus::Completed));
assert!(result.is_err());
}
#[test]
fn test_apply_progress_update_workflow() {
let session = create_test_session();
let result = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 3,
failed_steps: 0,
current_step: Some("step-4".to_string()),
}),
);
assert!(result.is_ok());
let updated = result.unwrap();
let wd = updated.workflow_data.unwrap();
assert_eq!(wd.completed_steps.len(), 3);
assert_eq!(wd.current_step, 3);
assert_eq!(
updated.metadata.get("current_step"),
Some(&Value::String("step-4".to_string()))
);
}
#[test]
fn test_apply_progress_update_mapreduce() {
let session = create_test_mapreduce_session();
let result = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 10,
failed_steps: 2,
current_step: None,
}),
);
assert!(result.is_ok());
let updated = result.unwrap();
let md = updated.mapreduce_data.unwrap();
assert_eq!(md.processed_items, 10);
assert_eq!(md.failed_items, 2);
}
#[test]
fn test_apply_progress_update_accumulates() {
let mut session = create_test_session();
session = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 3,
failed_steps: 0,
current_step: None,
}),
)
.unwrap();
session = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 2,
failed_steps: 1,
current_step: None,
}),
)
.unwrap();
let wd = session.workflow_data.unwrap();
assert_eq!(wd.completed_steps.len(), 5); }
#[test]
fn test_apply_variable_update_new_vars() {
let session = create_test_session();
let mut new_vars = HashMap::new();
new_vars.insert("key1".to_string(), Value::String("value1".to_string()));
new_vars.insert("key2".to_string(), Value::Number(42.into()));
let result = apply_session_update(session, SessionUpdate::Variables(new_vars));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.metadata.len(), 2);
assert_eq!(
updated.metadata.get("key1"),
Some(&Value::String("value1".to_string()))
);
assert_eq!(
updated.metadata.get("key2"),
Some(&Value::Number(42.into()))
);
}
#[test]
fn test_apply_variable_update_merges_with_existing() {
let mut session = create_test_session();
session
.metadata
.insert("existing".to_string(), Value::String("old".to_string()));
let mut new_vars = HashMap::new();
new_vars.insert(
"new_key".to_string(),
Value::String("new_value".to_string()),
);
let result = apply_session_update(session, SessionUpdate::Variables(new_vars));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.metadata.len(), 2);
assert!(updated.metadata.contains_key("existing"));
assert!(updated.metadata.contains_key("new_key"));
}
#[test]
fn test_apply_variable_update_overwrites_existing() {
let mut session = create_test_session();
session
.metadata
.insert("key".to_string(), Value::String("old".to_string()));
let mut new_vars = HashMap::new();
new_vars.insert("key".to_string(), Value::String("new".to_string()));
let result = apply_session_update(session, SessionUpdate::Variables(new_vars));
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(
updated.metadata.get("key"),
Some(&Value::String("new".to_string()))
);
}
#[test]
fn test_apply_add_step() {
let session = create_test_session();
let step = StepRecord {
command: "echo hello".to_string(),
started_at: Utc::now(),
completed_at: None,
status: "running".to_string(),
output: None,
};
let result = apply_session_update(session, SessionUpdate::AddStep(step));
assert!(result.is_ok());
let updated = result.unwrap();
let steps = updated
.metadata
.get("execution_steps")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(steps.len(), 1);
}
#[test]
fn test_apply_add_step_preserves_order() {
let mut session = create_test_session();
let step1 = StepRecord::started("step 1");
session = apply_session_update(session, SessionUpdate::AddStep(step1)).unwrap();
let step2 = StepRecord::started("step 2");
session = apply_session_update(session, SessionUpdate::AddStep(step2)).unwrap();
let step3 = StepRecord::started("step 3");
session = apply_session_update(session, SessionUpdate::AddStep(step3)).unwrap();
let steps = session
.metadata
.get("execution_steps")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(steps.len(), 3);
assert_eq!(
steps[0].get("command").and_then(|v| v.as_str()),
Some("step 1")
);
assert_eq!(
steps[1].get("command").and_then(|v| v.as_str()),
Some("step 2")
);
assert_eq!(
steps[2].get("command").and_then(|v| v.as_str()),
Some("step 3")
);
}
#[test]
fn test_step_record_lifecycle() {
let step = StepRecord::started("test command");
assert_eq!(step.status, "running");
assert!(step.completed_at.is_none());
let completed = step.clone().complete(Some("output".to_string()));
assert_eq!(completed.status, "completed");
assert!(completed.completed_at.is_some());
assert_eq!(completed.output, Some("output".to_string()));
let failed = StepRecord::started("failing command").fail("error message");
assert_eq!(failed.status, "failed");
assert!(failed.completed_at.is_some());
assert_eq!(failed.output, Some("error message".to_string()));
}
#[test]
fn test_apply_updates_sequence() {
let session = create_test_session();
let updates = vec![
SessionUpdate::Status(SessionStatus::Running),
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 5,
failed_steps: 0,
current_step: Some("step-5".to_string()),
}),
SessionUpdate::Variables({
let mut m = HashMap::new();
m.insert("result".to_string(), Value::String("success".to_string()));
m
}),
];
let result = apply_updates(session, updates);
assert!(result.is_ok());
let updated = result.unwrap();
assert_eq!(updated.status, SessionStatus::Running);
assert!(updated.metadata.contains_key("result"));
}
#[test]
fn test_apply_updates_stops_on_error() {
let session = create_test_session();
let updates = vec![
SessionUpdate::Status(SessionStatus::Running),
SessionUpdate::Status(SessionStatus::Initializing),
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 100,
failed_steps: 0,
current_step: None,
}),
];
let result = apply_updates(session, updates);
assert!(result.is_err());
}
#[test]
fn test_updates_preserve_immutability() {
let original = create_test_session();
let original_id = original.id.clone();
let original_status = original.status.clone();
let updated = apply_session_update(
original.clone(),
SessionUpdate::Status(SessionStatus::Running),
)
.unwrap();
assert_eq!(original.id, original_id);
assert_eq!(original.status, original_status);
assert_eq!(original.status, SessionStatus::Initializing);
assert_eq!(updated.id, original_id);
assert_eq!(updated.status, SessionStatus::Running);
}
#[test]
fn test_updated_at_always_changes() {
let session = create_test_session();
let original_updated_at = session.updated_at;
std::thread::sleep(std::time::Duration::from_millis(10));
let updated = apply_session_update(
session,
SessionUpdate::Variables(HashMap::new()), )
.unwrap();
assert!(updated.updated_at > original_updated_at);
}
#[test]
fn test_progress_update_no_overflow() {
let mut session = create_test_session();
if let Some(ref mut wd) = session.workflow_data {
wd.total_steps = 5;
}
let result = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: 100,
failed_steps: 0,
current_step: None,
}),
);
assert!(result.is_ok());
let updated = result.unwrap();
let wd = updated.workflow_data.unwrap();
assert!(wd.completed_steps.len() <= 5);
}
#[test]
fn test_empty_variable_update() {
let mut session = create_test_session();
session
.metadata
.insert("key".to_string(), Value::String("value".to_string()));
let result = apply_session_update(session, SessionUpdate::Variables(HashMap::new()));
assert!(result.is_ok());
let updated = result.unwrap();
assert!(updated.metadata.contains_key("key"));
}
}
#[cfg(test)]
mod property_tests {
use super::*;
use proptest::prelude::*;
fn key_strategy() -> impl Strategy<Value = String> {
"[a-z][a-z0-9_]{0,19}".prop_map(|s| s)
}
fn value_strategy() -> impl Strategy<Value = String> {
"[a-zA-Z0-9 ]{0,50}".prop_map(|s| s)
}
fn create_test_session() -> UnifiedSession {
let mut session =
UnifiedSession::new_workflow("test-workflow".to_string(), "test".to_string());
if let Some(ref mut wd) = session.workflow_data {
wd.total_steps = 100;
}
session
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_updates_preserve_session_id(
completed in 0usize..50,
failed in 0usize..50,
) {
let session = create_test_session();
let original_id = session.id.clone();
let update = SessionUpdate::Progress(ProgressUpdate {
completed_steps: completed,
failed_steps: failed,
current_step: None,
});
let result = apply_session_update(session, update);
prop_assert!(result.is_ok());
prop_assert_eq!(result.unwrap().id, original_id);
}
#[test]
fn prop_variable_updates_always_succeed(
keys in prop::collection::vec(key_strategy(), 0..10),
values in prop::collection::vec(value_strategy(), 0..10),
) {
let session = create_test_session();
let mut new_vars = HashMap::new();
for (key, value) in keys.into_iter().zip(values.into_iter()) {
new_vars.insert(key, Value::String(value));
}
let result = apply_session_update(session, SessionUpdate::Variables(new_vars));
prop_assert!(result.is_ok());
}
#[test]
fn prop_variable_merge_includes_all_keys(
existing_keys in prop::collection::vec(key_strategy(), 0..5),
existing_values in prop::collection::vec(value_strategy(), 0..5),
new_keys in prop::collection::vec(key_strategy(), 0..5),
new_values in prop::collection::vec(value_strategy(), 0..5),
) {
let mut session = create_test_session();
for (key, value) in existing_keys.iter().zip(existing_values.iter()) {
session.metadata.insert(key.clone(), Value::String(value.clone()));
}
let mut new_vars = HashMap::new();
for (key, value) in new_keys.iter().zip(new_values.iter()) {
new_vars.insert(key.clone(), Value::String(value.clone()));
}
let result = apply_session_update(session, SessionUpdate::Variables(new_vars.clone()));
prop_assert!(result.is_ok());
let updated = result.unwrap();
for key in new_vars.keys() {
prop_assert!(updated.metadata.contains_key(key));
}
}
#[test]
fn prop_progress_updates_monotonic(
updates in prop::collection::vec(0usize..10, 1..5),
) {
let mut session = create_test_session();
let mut total_completed = 0;
for completed in updates {
session = apply_session_update(
session,
SessionUpdate::Progress(ProgressUpdate {
completed_steps: completed,
failed_steps: 0,
current_step: None,
}),
).unwrap();
total_completed += completed;
let wd = session.workflow_data.as_ref().unwrap();
prop_assert!(wd.completed_steps.len() <= total_completed);
}
}
#[test]
fn prop_steps_never_lost(
commands in prop::collection::vec("[a-z]{1,10}", 1..10),
) {
let mut session = create_test_session();
for command in &commands {
let step = StepRecord::started(command.clone());
session = apply_session_update(session, SessionUpdate::AddStep(step)).unwrap();
}
let steps = session
.metadata
.get("execution_steps")
.and_then(|v| v.as_array())
.unwrap();
prop_assert_eq!(steps.len(), commands.len());
}
#[test]
fn prop_updated_at_changes(
key in key_strategy(),
value in value_strategy(),
) {
let session = create_test_session();
let original_updated = session.updated_at;
std::thread::sleep(std::time::Duration::from_millis(1));
let mut vars = HashMap::new();
vars.insert(key, Value::String(value));
let result = apply_session_update(session, SessionUpdate::Variables(vars));
prop_assert!(result.is_ok());
let updated = result.unwrap();
prop_assert!(updated.updated_at >= original_updated);
}
}
}