Skip to main content

ta_changeset/
session_channel.rs

1// session_channel.rs — Session interaction protocol for human-agent communication.
2//
3// A `SessionChannel` trait that any frontend implements — CLI, Discord, Slack,
4// email, or web app. The protocol is the same: TA doesn't care where the message
5// came from, only that it's authenticated and routed to the right session.
6//
7// This is the core abstraction for Phase v0.3.1.2 (Interactive Session Orchestration).
8
9use std::fmt;
10use std::time::Duration;
11
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16/// A bidirectional channel between a human and a TA-mediated agent session.
17///
18/// Every interaction between human and TA is a message on a channel.
19/// The CLI is one channel. A Discord thread is another. The protocol is the same.
20pub trait SessionChannel: Send + Sync {
21    /// Display agent output to the human (streaming).
22    fn emit(&self, event: &SessionEvent) -> Result<(), SessionChannelError>;
23
24    /// Receive human input (blocks until available or timeout).
25    /// Returns `None` on timeout.
26    fn receive(&self, timeout: Duration) -> Result<Option<HumanInput>, SessionChannelError>;
27
28    /// Channel identity (for audit trail).
29    /// e.g., "cli:tty0", "discord:thread:123", "slack:C04:1234567890"
30    fn channel_id(&self) -> &str;
31}
32
33/// Events emitted from TA/agent to the human.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(tag = "event_type", rename_all = "snake_case")]
36pub enum SessionEvent {
37    /// Agent output (stdout or stderr).
38    AgentOutput {
39        stream: OutputStream,
40        content: String,
41    },
42
43    /// A draft is ready for review (checkpoint).
44    DraftReady {
45        draft_id: Uuid,
46        summary: String,
47        artifact_count: usize,
48    },
49
50    /// The goal is complete.
51    GoalComplete { goal_id: Uuid },
52
53    /// Agent is waiting for human guidance.
54    WaitingForInput { prompt: String },
55
56    /// Session status update (informational).
57    StatusUpdate { message: String },
58}
59
60impl fmt::Display for SessionEvent {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            SessionEvent::AgentOutput { stream, content } => {
64                write!(f, "[{}] {}", stream, content)
65            }
66            SessionEvent::DraftReady {
67                draft_id,
68                summary,
69                artifact_count,
70            } => {
71                write!(
72                    f,
73                    "Draft ready: {} ({} artifacts) — {}",
74                    draft_id, artifact_count, summary
75                )
76            }
77            SessionEvent::GoalComplete { goal_id } => {
78                write!(f, "Goal complete: {}", goal_id)
79            }
80            SessionEvent::WaitingForInput { prompt } => {
81                write!(f, "Waiting for input: {}", prompt)
82            }
83            SessionEvent::StatusUpdate { message } => {
84                write!(f, "Status: {}", message)
85            }
86        }
87    }
88}
89
90/// Output stream identifier (stdout vs stderr).
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92#[serde(rename_all = "snake_case")]
93pub enum OutputStream {
94    StdOut,
95    StdErr,
96}
97
98impl fmt::Display for OutputStream {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match self {
101            OutputStream::StdOut => write!(f, "stdout"),
102            OutputStream::StdErr => write!(f, "stderr"),
103        }
104    }
105}
106
107/// Input from a human to the session.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(tag = "input_type", rename_all = "snake_case")]
110pub enum HumanInput {
111    /// Free-form guidance message injected into agent context.
112    Message { text: String },
113
114    /// Inline review: approve a draft artifact.
115    Approve {
116        draft_id: Uuid,
117        artifact_uri: Option<String>,
118    },
119
120    /// Inline review: reject a draft artifact with reason.
121    Reject {
122        draft_id: Uuid,
123        artifact_uri: Option<String>,
124        reason: String,
125    },
126
127    /// Abort the session.
128    Abort,
129}
130
131impl fmt::Display for HumanInput {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        match self {
134            HumanInput::Message { text } => write!(f, "Message: {}", text),
135            HumanInput::Approve {
136                draft_id,
137                artifact_uri,
138            } => {
139                if let Some(uri) = artifact_uri {
140                    write!(f, "Approve {} in draft {}", uri, draft_id)
141                } else {
142                    write!(f, "Approve draft {}", draft_id)
143                }
144            }
145            HumanInput::Reject {
146                draft_id,
147                artifact_uri,
148                reason,
149            } => {
150                if let Some(uri) = artifact_uri {
151                    write!(f, "Reject {} in draft {}: {}", uri, draft_id, reason)
152                } else {
153                    write!(f, "Reject draft {}: {}", draft_id, reason)
154                }
155            }
156            HumanInput::Abort => write!(f, "Abort session"),
157        }
158    }
159}
160
161/// Errors from session channel operations.
162#[derive(Debug, thiserror::Error)]
163pub enum SessionChannelError {
164    #[error("I/O error: {0}")]
165    Io(#[from] std::io::Error),
166
167    #[error("channel closed")]
168    ChannelClosed,
169
170    #[error("session error: {0}")]
171    Other(String),
172}
173
174/// A persistent interactive session record linking a goal to channel state.
175///
176/// Tracks the lifecycle of a human-agent interactive session across CLI invocations.
177/// Serialized to JSON for persistence (same pattern as ReviewSession).
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct InteractiveSession {
180    /// Unique session identifier.
181    pub session_id: Uuid,
182
183    /// The GoalRun this session is attached to.
184    pub goal_id: Uuid,
185
186    /// Channel identity (e.g., "cli:tty0").
187    pub channel_id: String,
188
189    /// Agent identity (e.g., "claude-code").
190    pub agent_id: String,
191
192    /// Session lifecycle state.
193    pub state: InteractiveSessionState,
194
195    /// Session creation time.
196    pub created_at: DateTime<Utc>,
197
198    /// Last activity time.
199    pub updated_at: DateTime<Utc>,
200
201    /// Message log (persisted for audit and resume).
202    #[serde(default, skip_serializing_if = "Vec::is_empty")]
203    pub messages: Vec<SessionMessage>,
204
205    /// Associated draft IDs (drafts reviewed inline during this session).
206    #[serde(default, skip_serializing_if = "Vec::is_empty")]
207    pub draft_ids: Vec<Uuid>,
208}
209
210/// Session lifecycle states.
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
212#[serde(rename_all = "snake_case")]
213pub enum InteractiveSessionState {
214    /// Session is active (agent running, human connected).
215    Active,
216    /// Session is paused (agent suspended, can be resumed).
217    Paused,
218    /// Session completed successfully.
219    Completed,
220    /// Session was aborted by the human.
221    Aborted,
222}
223
224impl fmt::Display for InteractiveSessionState {
225    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226        match self {
227            InteractiveSessionState::Active => write!(f, "active"),
228            InteractiveSessionState::Paused => write!(f, "paused"),
229            InteractiveSessionState::Completed => write!(f, "completed"),
230            InteractiveSessionState::Aborted => write!(f, "aborted"),
231        }
232    }
233}
234
235/// A logged message in a session (for audit and replay).
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct SessionMessage {
238    /// Who sent the message ("human", "agent", "ta-system").
239    pub sender: String,
240    /// Message content.
241    pub content: String,
242    /// When the message was sent.
243    pub timestamp: DateTime<Utc>,
244}
245
246impl InteractiveSession {
247    /// Create a new interactive session for a goal.
248    pub fn new(goal_id: Uuid, channel_id: String, agent_id: String) -> Self {
249        let now = Utc::now();
250        Self {
251            session_id: Uuid::new_v4(),
252            goal_id,
253            channel_id,
254            agent_id,
255            state: InteractiveSessionState::Active,
256            created_at: now,
257            updated_at: now,
258            messages: Vec::new(),
259            draft_ids: Vec::new(),
260        }
261    }
262
263    /// Record a message in the session log.
264    pub fn log_message(&mut self, sender: &str, content: &str) {
265        self.messages.push(SessionMessage {
266            sender: sender.to_string(),
267            content: content.to_string(),
268            timestamp: Utc::now(),
269        });
270        self.updated_at = Utc::now();
271    }
272
273    /// Record that a draft was reviewed inline during this session.
274    pub fn add_draft(&mut self, draft_id: Uuid) {
275        if !self.draft_ids.contains(&draft_id) {
276            self.draft_ids.push(draft_id);
277        }
278        self.updated_at = Utc::now();
279    }
280
281    /// Transition to a new state.
282    pub fn transition(
283        &mut self,
284        new_state: InteractiveSessionState,
285    ) -> Result<(), SessionChannelError> {
286        let valid = matches!(
287            (&self.state, &new_state),
288            (
289                InteractiveSessionState::Active,
290                InteractiveSessionState::Paused
291            ) | (
292                InteractiveSessionState::Active,
293                InteractiveSessionState::Completed
294            ) | (
295                InteractiveSessionState::Active,
296                InteractiveSessionState::Aborted
297            ) | (
298                InteractiveSessionState::Paused,
299                InteractiveSessionState::Active
300            ) | (
301                InteractiveSessionState::Paused,
302                InteractiveSessionState::Aborted
303            )
304        );
305
306        if !valid {
307            return Err(SessionChannelError::Other(format!(
308                "invalid session transition from {} to {}",
309                self.state, new_state
310            )));
311        }
312
313        self.state = new_state;
314        self.updated_at = Utc::now();
315        Ok(())
316    }
317
318    /// Check if the session is in an active or paused state (can still be used).
319    pub fn is_alive(&self) -> bool {
320        matches!(
321            self.state,
322            InteractiveSessionState::Active | InteractiveSessionState::Paused
323        )
324    }
325
326    /// Get elapsed time since session start.
327    pub fn elapsed(&self) -> chrono::Duration {
328        Utc::now() - self.created_at
329    }
330
331    /// Format elapsed time as a human-readable string.
332    pub fn elapsed_display(&self) -> String {
333        let dur = self.elapsed();
334        let secs = dur.num_seconds();
335        if secs < 60 {
336            format!("{}s", secs)
337        } else if secs < 3600 {
338            format!("{}m {}s", secs / 60, secs % 60)
339        } else {
340            format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
341        }
342    }
343}
344
345/// Per-agent interactive configuration (loaded from YAML).
346///
347/// Extends the AgentLaunchConfig with interactive session settings.
348#[derive(Debug, Clone, Serialize, Deserialize, Default)]
349pub struct InteractiveConfig {
350    /// Whether interactive mode is available for this agent.
351    #[serde(default)]
352    pub enabled: bool,
353
354    /// Output capture mode: "pty", "pipe", or "log".
355    #[serde(default = "default_output_capture")]
356    pub output_capture: String,
357
358    /// Whether to allow human input injection during agent execution.
359    #[serde(default = "default_true")]
360    pub allow_human_input: bool,
361
362    /// Auto-exit condition: "idle_timeout: 300s" or "goal_complete".
363    #[serde(default)]
364    pub auto_exit_on: Option<String>,
365
366    /// Override launch command for resume (e.g., "claude --resume {session_id}").
367    #[serde(default)]
368    pub resume_cmd: Option<String>,
369}
370
371fn default_output_capture() -> String {
372    "pipe".to_string()
373}
374
375fn default_true() -> bool {
376    true
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn new_interactive_session_is_active() {
385        let session = InteractiveSession::new(
386            Uuid::new_v4(),
387            "cli:tty0".to_string(),
388            "claude-code".to_string(),
389        );
390        assert_eq!(session.state, InteractiveSessionState::Active);
391        assert!(session.messages.is_empty());
392        assert!(session.draft_ids.is_empty());
393        assert!(session.is_alive());
394    }
395
396    #[test]
397    fn log_message_adds_to_history() {
398        let mut session = InteractiveSession::new(
399            Uuid::new_v4(),
400            "cli:tty0".to_string(),
401            "claude-code".to_string(),
402        );
403
404        session.log_message("human", "Focus on the auth module");
405        session.log_message("agent", "Understood, working on auth");
406
407        assert_eq!(session.messages.len(), 2);
408        assert_eq!(session.messages[0].sender, "human");
409        assert_eq!(session.messages[1].sender, "agent");
410    }
411
412    #[test]
413    fn add_draft_deduplicates() {
414        let mut session = InteractiveSession::new(
415            Uuid::new_v4(),
416            "cli:tty0".to_string(),
417            "claude-code".to_string(),
418        );
419        let draft_id = Uuid::new_v4();
420
421        session.add_draft(draft_id);
422        session.add_draft(draft_id);
423
424        assert_eq!(session.draft_ids.len(), 1);
425    }
426
427    #[test]
428    fn valid_transitions() {
429        let mut session = InteractiveSession::new(
430            Uuid::new_v4(),
431            "cli:tty0".to_string(),
432            "claude-code".to_string(),
433        );
434
435        // Active → Paused
436        session.transition(InteractiveSessionState::Paused).unwrap();
437        assert_eq!(session.state, InteractiveSessionState::Paused);
438
439        // Paused → Active
440        session.transition(InteractiveSessionState::Active).unwrap();
441        assert_eq!(session.state, InteractiveSessionState::Active);
442
443        // Active → Completed
444        session
445            .transition(InteractiveSessionState::Completed)
446            .unwrap();
447        assert_eq!(session.state, InteractiveSessionState::Completed);
448    }
449
450    #[test]
451    fn invalid_transition_returns_error() {
452        let mut session = InteractiveSession::new(
453            Uuid::new_v4(),
454            "cli:tty0".to_string(),
455            "claude-code".to_string(),
456        );
457        session
458            .transition(InteractiveSessionState::Completed)
459            .unwrap();
460
461        // Completed → Active should fail
462        let result = session.transition(InteractiveSessionState::Active);
463        assert!(result.is_err());
464    }
465
466    #[test]
467    fn abort_from_active() {
468        let mut session = InteractiveSession::new(
469            Uuid::new_v4(),
470            "cli:tty0".to_string(),
471            "claude-code".to_string(),
472        );
473        session
474            .transition(InteractiveSessionState::Aborted)
475            .unwrap();
476        assert!(!session.is_alive());
477    }
478
479    #[test]
480    fn abort_from_paused() {
481        let mut session = InteractiveSession::new(
482            Uuid::new_v4(),
483            "cli:tty0".to_string(),
484            "claude-code".to_string(),
485        );
486        session.transition(InteractiveSessionState::Paused).unwrap();
487        session
488            .transition(InteractiveSessionState::Aborted)
489            .unwrap();
490        assert!(!session.is_alive());
491    }
492
493    #[test]
494    fn session_serialization_round_trip() {
495        let mut session = InteractiveSession::new(
496            Uuid::new_v4(),
497            "cli:tty0".to_string(),
498            "claude-code".to_string(),
499        );
500        session.log_message("human", "Test message");
501        session.add_draft(Uuid::new_v4());
502
503        let json = serde_json::to_string(&session).unwrap();
504        let restored: InteractiveSession = serde_json::from_str(&json).unwrap();
505
506        assert_eq!(restored.session_id, session.session_id);
507        assert_eq!(restored.goal_id, session.goal_id);
508        assert_eq!(restored.channel_id, session.channel_id);
509        assert_eq!(restored.agent_id, session.agent_id);
510        assert_eq!(restored.messages.len(), 1);
511        assert_eq!(restored.draft_ids.len(), 1);
512    }
513
514    #[test]
515    fn session_event_display() {
516        let event = SessionEvent::AgentOutput {
517            stream: OutputStream::StdOut,
518            content: "Hello world".to_string(),
519        };
520        assert_eq!(format!("{}", event), "[stdout] Hello world");
521
522        let event = SessionEvent::WaitingForInput {
523            prompt: "What next?".to_string(),
524        };
525        assert_eq!(format!("{}", event), "Waiting for input: What next?");
526    }
527
528    #[test]
529    fn human_input_display() {
530        let input = HumanInput::Message {
531            text: "Focus on auth".to_string(),
532        };
533        assert_eq!(format!("{}", input), "Message: Focus on auth");
534
535        let input = HumanInput::Abort;
536        assert_eq!(format!("{}", input), "Abort session");
537    }
538
539    #[test]
540    fn output_stream_display() {
541        assert_eq!(format!("{}", OutputStream::StdOut), "stdout");
542        assert_eq!(format!("{}", OutputStream::StdErr), "stderr");
543    }
544
545    #[test]
546    fn interactive_config_defaults() {
547        let config: InteractiveConfig = serde_json::from_str("{}").unwrap();
548        assert!(!config.enabled);
549        assert_eq!(config.output_capture, "pipe");
550        assert!(config.allow_human_input);
551        assert!(config.auto_exit_on.is_none());
552        assert!(config.resume_cmd.is_none());
553    }
554
555    #[test]
556    fn interactive_config_from_yaml() {
557        let yaml = r#"
558enabled: true
559output_capture: pty
560allow_human_input: true
561auto_exit_on: "idle_timeout: 300s"
562resume_cmd: "claude --resume {session_id}"
563"#;
564        let config: InteractiveConfig = serde_yaml::from_str(yaml).unwrap();
565        assert!(config.enabled);
566        assert_eq!(config.output_capture, "pty");
567        assert!(config.allow_human_input);
568        assert_eq!(config.auto_exit_on.as_deref(), Some("idle_timeout: 300s"));
569        assert_eq!(
570            config.resume_cmd.as_deref(),
571            Some("claude --resume {session_id}")
572        );
573    }
574
575    #[test]
576    fn elapsed_display_formatting() {
577        let mut session = InteractiveSession::new(
578            Uuid::new_v4(),
579            "cli:tty0".to_string(),
580            "claude-code".to_string(),
581        );
582        // Just created — should show a very small duration.
583        let display = session.elapsed_display();
584        assert!(display.ends_with('s'));
585
586        // Manually set created_at to the past for testing.
587        session.created_at = Utc::now() - chrono::Duration::minutes(5);
588        let display = session.elapsed_display();
589        assert!(display.contains('m'));
590    }
591
592    #[test]
593    fn session_event_serialization_round_trip() {
594        let event = SessionEvent::DraftReady {
595            draft_id: Uuid::new_v4(),
596            summary: "Test draft".to_string(),
597            artifact_count: 3,
598        };
599        let json = serde_json::to_string(&event).unwrap();
600        let restored: SessionEvent = serde_json::from_str(&json).unwrap();
601        if let SessionEvent::DraftReady { artifact_count, .. } = restored {
602            assert_eq!(artifact_count, 3);
603        } else {
604            panic!("Expected DraftReady variant");
605        }
606    }
607
608    #[test]
609    fn human_input_serialization_round_trip() {
610        let input = HumanInput::Reject {
611            draft_id: Uuid::new_v4(),
612            artifact_uri: Some("fs://workspace/main.rs".to_string()),
613            reason: "needs error handling".to_string(),
614        };
615        let json = serde_json::to_string(&input).unwrap();
616        let restored: HumanInput = serde_json::from_str(&json).unwrap();
617        if let HumanInput::Reject { reason, .. } = restored {
618            assert_eq!(reason, "needs error handling");
619        } else {
620            panic!("Expected Reject variant");
621        }
622    }
623}