# 1.3 Session Persistence System
**Deliverable**: Comprehensive state management, persistence formats, and recovery mechanisms
**Status**: ✅ Implemented - Full checkpoint/resume functionality with atomic persistence and recovery
## Overview
The session persistence system ensures complete recoverability and continuity of agent operations across interruptions, restarts, and system failures. It maintains atomic consistency while optimizing for performance and storage efficiency.
## State Architecture
### Core State Components
The system maintains four primary state categories with distinct persistence requirements:
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionState {
pub session_id: SessionId,
pub created_at: DateTime<Utc>,
pub last_checkpoint: DateTime<Utc>,
pub configuration: SessionConfiguration,
pub task_tree: TaskTreeState,
pub claude_context: ClaudeContextState,
pub file_system: FileSystemState,
pub execution_logs: ExecutionLogState,
pub resource_usage: ResourceUsageState,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskTreeState {
pub tasks: HashMap<TaskId, Task>,
pub root_tasks: Vec<TaskId>,
pub active_task: Option<TaskId>,
pub task_counter: u64,
pub dependency_graph: DependencyGraph,
pub scheduling_state: SchedulingState,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ClaudeContextState {
pub session_id: String,
pub conversation_history: ConversationHistory,
pub context_windows: Vec<ContextWindow>,
pub rate_limit_state: RateLimitState,
pub model_configuration: ModelConfiguration,
pub usage_tracking: UsageTracking,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FileSystemState {
pub workspace_snapshot: WorkspaceSnapshot,
pub file_modifications: Vec<FileModification>,
pub build_artifacts: HashMap<String, BuildArtifact>,
pub dependency_resolution: DependencyResolution,
pub git_state: GitState,
}
```
## Persistence Architecture
### Storage Layout
The session data is organized in a structured directory layout optimized for atomic updates and efficient access. Session data is stored in a hidden `.aca` directory within the workspace root to maintain a clean development environment:
```
.aca/
└── sessions/{session_id}/
├── meta/
│ ├── session.json # Session metadata and configuration
│ ├── checkpoint_manifest.json # Checkpoint history and validation
│ └── recovery_info.json # Recovery instructions and state
│
├── state/
│ ├── task_tree.json # Complete task hierarchy and status
│ ├── scheduling_state.json # Task scheduler state and queues
│ └── dependency_graph.json # Task dependencies and relationships
│
├── claude/
│ ├── session_config.json # Claude Code session configuration
│ ├── conversation/ # Conversation history in chunks
│ │ ├── messages_001.json
│ │ ├── messages_002.json
│ │ └── current.json
│ ├── context_windows/ # Maintained context windows
│ │ ├── window_001.json
│ │ └── window_002.json
│ └── rate_limit/
│ ├── usage_history.json
│ └── current_limits.json
│
├── filesystem/
│ ├── workspace_snapshot.json # File system state tracking
│ ├── modifications/ # Change tracking and history
│ │ ├── changes_001.json
│ │ └── changes_002.json
│ ├── build_artifacts/ # Build results and artifacts
│ │ ├── compilation_results.json
│ │ └── test_results.json
│ └── git_state.json # Git repository state
│
├── logs/
│ ├── execution/ # Task execution logs
│ │ ├── task_{task_id}.json
│ │ └── system_events.json
│ ├── claude_interactions/ # Claude Code API interactions
│ │ ├── requests_001.json
│ │ └── responses_001.json
│ └── errors/ # Error logs and stack traces
│ ├── error_001.json
│ └── recovery_actions.json
│
└── checkpoints/ # Point-in-time snapshots
├── checkpoint_001/
│ ├── state_snapshot.json
│ └── manifest.json
└── checkpoint_002/
├── state_snapshot.json
└── manifest.json
```
### Benefits of `.aca` Directory Structure
The hidden `.aca` directory provides several advantages:
- **Clean Workspace**: Keeps development files separate from session metadata
- **Standards Compliance**: Follows established conventions (like `.git`, `.vscode`, `.cargo`)
- **Multi-Session Support**: Multiple sessions can coexist without conflicts
- **Easy Maintenance**: Centralized location for all session-related data
- **Version Control Friendly**: Can be easily added to `.gitignore` if desired
- **Backup/Restore**: Simple to backup entire session history
### Atomic Persistence Operations
All state updates use atomic write operations to prevent corruption:
```rust
pub struct AtomicPersistence {
session_dir: PathBuf,
write_lock: Arc<Mutex<()>>,
compression_enabled: bool,
}
impl AtomicPersistence {
pub async fn persist_state<T: Serialize>(
&self,
component: StateComponent,
data: &T,
) -> Result<()> {
let _lock = self.write_lock.lock().await;
// Write to temporary file first
let temp_path = self.get_temp_path(&component);
let target_path = self.get_component_path(&component);
// Serialize data
let serialized = if self.compression_enabled {
self.serialize_compressed(data)?
} else {
serde_json::to_vec_pretty(data)?
};
// Atomic write operation
fs::write(&temp_path, serialized).await?;
fs::rename(&temp_path, &target_path).await?;
// Update checkpoint manifest
self.update_checkpoint_manifest(&component).await?;
Ok(())
}
pub async fn load_state<T: DeserializeOwned>(
&self,
component: StateComponent,
) -> Result<T> {
let path = self.get_component_path(&component);
if !path.exists() {
return Err(PersistenceError::ComponentNotFound(component));
}
let data = fs::read(&path).await?;
let deserialized = if self.compression_enabled {
self.deserialize_compressed(&data)?
} else {
serde_json::from_slice(&data)?
};
Ok(deserialized)
}
}
```
### Incremental State Updates
For large state components, the system uses incremental updates to minimize I/O:
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct StateUpdate {
pub timestamp: DateTime<Utc>,
pub component: StateComponent,
pub operation: UpdateOperation,
pub sequence_number: u64,
pub checksum: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum UpdateOperation {
TaskCreated { task: Task },
TaskStatusChanged { task_id: TaskId, old_status: TaskStatus, new_status: TaskStatus },
TaskModified { task_id: TaskId, changes: TaskChanges },
FileModified { path: PathBuf, modification: FileModification },
ConversationExtended { messages: Vec<ClaudeMessage> },
DependencyAdded { from: TaskId, to: TaskId, dep_type: DependencyType },
}
impl SessionState {
pub async fn apply_incremental_update(&mut self, update: StateUpdate) -> Result<()> {
match update.operation {
UpdateOperation::TaskCreated { task } => {
self.task_tree.tasks.insert(task.id, task);
self.task_tree.task_counter += 1;
}
UpdateOperation::TaskStatusChanged { task_id, new_status, .. } => {
if let Some(task) = self.task_tree.tasks.get_mut(&task_id) {
task.status = new_status;
task.updated_at = update.timestamp;
}
}
UpdateOperation::FileModified { path, modification } => {
self.file_system.file_modifications.push(modification);
self.file_system.workspace_snapshot.update_file(&path).await?;
}
UpdateOperation::ConversationExtended { messages } => {
self.claude_context.conversation_history.extend(messages);
}
_ => {} // Handle other operations
}
self.last_checkpoint = update.timestamp;
Ok(())
}
}
```
## Recovery Mechanisms
### Checkpoint System
The system creates periodic checkpoints with configurable intervals:
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Checkpoint {
pub id: CheckpointId,
pub created_at: DateTime<Utc>,
pub session_id: SessionId,
pub state_snapshot: CompressedStateSnapshot,
pub manifest: CheckpointManifest,
pub validation_hash: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CheckpointManifest {
pub total_tasks: u32,
pub completed_tasks: u32,
pub active_tasks: Vec<TaskId>,
pub file_modifications_count: u32,
pub conversation_messages_count: u32,
pub storage_size_bytes: u64,
pub dependencies: Vec<String>,
}
pub struct CheckpointManager {
session_dir: PathBuf,
retention_policy: CheckpointRetentionPolicy,
compression_level: u32,
}
impl CheckpointManager {
pub async fn create_checkpoint(&self, session: &SessionState) -> Result<CheckpointId> {
let checkpoint_id = self.generate_checkpoint_id();
let checkpoint_dir = self.session_dir.join("checkpoints").join(&checkpoint_id);
fs::create_dir_all(&checkpoint_dir).await?;
// Create compressed state snapshot
let snapshot = self.create_state_snapshot(session).await?;
let manifest = self.generate_manifest(session).await?;
let checkpoint = Checkpoint {
id: checkpoint_id.clone(),
created_at: Utc::now(),
session_id: session.session_id.clone(),
state_snapshot: snapshot,
manifest,
validation_hash: self.calculate_validation_hash(session).await?,
};
// Write checkpoint atomically
self.write_checkpoint(&checkpoint_dir, &checkpoint).await?;
// Update retention policy
self.apply_retention_policy().await?;
Ok(checkpoint_id)
}
pub async fn restore_from_checkpoint(
&self,
checkpoint_id: &CheckpointId,
) -> Result<SessionState> {
let checkpoint_path = self.get_checkpoint_path(checkpoint_id);
let checkpoint = self.load_checkpoint(&checkpoint_path).await?;
// Validate checkpoint integrity
self.validate_checkpoint(&checkpoint).await?;
// Decompress and restore state
let session_state = self.decompress_state_snapshot(&checkpoint.state_snapshot).await?;
// Apply any incremental updates since checkpoint
let updates = self.load_incremental_updates_since(&checkpoint.created_at).await?;
let mut restored_state = session_state;
for update in updates {
restored_state.apply_incremental_update(update).await?;
}
Ok(restored_state)
}
}
```
### Crash Recovery
The system implements comprehensive crash recovery with multiple fallback strategies:
```rust
#[derive(Debug, Clone)]
pub enum RecoveryStrategy {
/// Restore from latest checkpoint
LatestCheckpoint,
/// Restore from specific checkpoint
SpecificCheckpoint(CheckpointId),
/// Reconstruct from incremental updates
IncrementalReconstruction,
/// Partial recovery with user intervention
PartialRecovery,
}
pub struct CrashRecoveryManager {
session_dir: PathBuf,
checkpoint_manager: Arc<CheckpointManager>,
validation_rules: Vec<ValidationRule>,
}
impl CrashRecoveryManager {
pub async fn detect_crash_state(&self) -> Result<Option<CrashState>> {
// Check for incomplete write operations
let incomplete_writes = self.find_incomplete_writes().await?;
// Check for corrupted state files
let corrupted_files = self.validate_state_files().await?;
// Check for orphaned processes or resources
let orphaned_resources = self.find_orphaned_resources().await?;
if incomplete_writes.is_empty() && corrupted_files.is_empty() && orphaned_resources.is_empty() {
return Ok(None);
}
Ok(Some(CrashState {
incomplete_writes,
corrupted_files,
orphaned_resources,
detected_at: Utc::now(),
}))
}
pub async fn recover_session(
&self,
crash_state: CrashState,
strategy: RecoveryStrategy,
) -> Result<SessionState> {
match strategy {
RecoveryStrategy::LatestCheckpoint => {
self.recover_from_latest_checkpoint().await
}
RecoveryStrategy::SpecificCheckpoint(checkpoint_id) => {
self.checkpoint_manager.restore_from_checkpoint(&checkpoint_id).await
}
RecoveryStrategy::IncrementalReconstruction => {
self.reconstruct_from_incremental_updates().await
}
RecoveryStrategy::PartialRecovery => {
self.perform_partial_recovery(crash_state).await
}
}
}
async fn perform_partial_recovery(&self, crash_state: CrashState) -> Result<SessionState> {
// Start with empty session state
let mut session = SessionState::new_empty();
// Recover task tree from available data
if let Ok(task_data) = self.recover_task_tree_partial().await {
session.task_tree = task_data;
}
// Recover Claude context if possible
if let Ok(claude_data) = self.recover_claude_context_partial().await {
session.claude_context = claude_data;
}
// Mark session as requiring validation
session.configuration.requires_validation = true;
Ok(session)
}
}
```
## Performance Optimizations
### Lazy Loading
Large state components are loaded on-demand to minimize memory usage:
```rust
pub struct LazyStateManager {
session_dir: PathBuf,
loaded_components: Arc<Mutex<HashMap<StateComponent, Arc<dyn Any + Send + Sync>>>>,
access_patterns: Arc<Mutex<HashMap<StateComponent, AccessPattern>>>,
}
impl LazyStateManager {
pub async fn get_component<T: DeserializeOwned + Clone + Send + Sync + 'static>(
&self,
component: StateComponent,
) -> Result<Arc<T>> {
let mut loaded = self.loaded_components.lock().await;
if let Some(cached) = loaded.get(&component) {
if let Some(typed) = cached.downcast_ref::<T>() {
self.record_access(component).await;
return Ok(Arc::new(typed.clone()));
}
}
// Load component from disk
let data: T = self.load_from_disk(component).await?;
let arc_data = Arc::new(data.clone());
loaded.insert(component, arc_data.clone());
self.record_access(component).await;
Ok(Arc::new(data))
}
pub async fn invalidate_component(&self, component: StateComponent) {
let mut loaded = self.loaded_components.lock().await;
loaded.remove(&component);
}
}
```
### State Compression
Large state files are compressed to reduce storage requirements:
```rust
pub struct StateCompression {
compression_algorithm: CompressionAlgorithm,
compression_level: u32,
size_threshold: u64,
}
#[derive(Clone, Debug)]
pub enum CompressionAlgorithm {
None,
Gzip,
Zstd,
Lz4,
}
impl StateCompression {
pub fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.len() < self.size_threshold as usize {
return Ok(data.to_vec());
}
match self.compression_algorithm {
CompressionAlgorithm::None => Ok(data.to_vec()),
CompressionAlgorithm::Gzip => {
use flate2::{Compression, write::GzEncoder};
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.compression_level));
encoder.write_all(data)?;
Ok(encoder.finish()?)
}
CompressionAlgorithm::Zstd => {
Ok(zstd::encode_all(data, self.compression_level as i32)?)
}
CompressionAlgorithm::Lz4 => {
Ok(lz4_flex::compress(data))
}
}
}
}
```
## Data Integrity
### Validation System
The system implements comprehensive validation to ensure data integrity:
```rust
#[derive(Debug, Clone)]
pub struct ValidationRule {
pub name: String,
pub component: StateComponent,
pub rule_type: ValidationRuleType,
pub severity: ValidationSeverity,
}
#[derive(Debug, Clone)]
pub enum ValidationRuleType {
StructuralConsistency,
ReferentialIntegrity,
BusinessLogic,
ChecksumValidation,
}
#[derive(Debug, Clone)]
pub enum ValidationSeverity {
Warning,
Error,
Critical,
}
pub struct StateValidator {
rules: Vec<ValidationRule>,
repair_strategies: HashMap<ValidationRuleType, RepairStrategy>,
}
impl StateValidator {
pub async fn validate_session_state(&self, state: &SessionState) -> ValidationResult {
let mut issues = Vec::new();
for rule in &self.rules {
match self.apply_validation_rule(rule, state).await {
Ok(_) => continue,
Err(validation_error) => {
issues.push(ValidationIssue {
rule: rule.clone(),
error: validation_error,
suggested_repair: self.suggest_repair(rule).await,
});
}
}
}
ValidationResult {
is_valid: issues.iter().all(|i| i.rule.severity != ValidationSeverity::Critical),
issues,
repair_suggestions: self.generate_repair_plan(&issues).await,
}
}
async fn validate_task_tree_consistency(&self, task_tree: &TaskTreeState) -> Result<()> {
// Validate parent-child relationships
for (task_id, task) in &task_tree.tasks {
if let Some(parent_id) = task.parent_id {
let parent = task_tree.tasks.get(&parent_id)
.ok_or(ValidationError::OrphanedTask(*task_id))?;
if !parent.children.contains(task_id) {
return Err(ValidationError::InconsistentParentChild {
parent: parent_id,
child: *task_id,
});
}
}
// Validate dependencies exist
for &dep_id in &task.dependencies {
if !task_tree.tasks.contains_key(&dep_id) {
return Err(ValidationError::MissingDependency {
task: *task_id,
dependency: dep_id,
});
}
}
}
// Validate no circular dependencies
self.check_circular_dependencies(&task_tree.dependency_graph).await?;
Ok(())
}
}
```
This comprehensive persistence system ensures robust state management with efficient storage, reliable recovery, and strong data integrity guarantees for long-running development workflows.