use crate::config::SessionConfig;
use crate::error::ClaudeError;
use crate::error::Result;
use crate::process::ProcessHandle;
use crate::stream::JsonStreamParser;
use crate::stream::SingleJsonParser;
use crate::stream::TextParser;
use crate::types::Event;
use crate::types::OutputFormat;
use crate::types::Result as ClaudeResult;
use chrono::Utc;
use futures::StreamExt;
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::io::AsyncBufReadExt;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::warn;
use uuid::Uuid;
pub struct Session {
id: String,
config: SessionConfig,
start_time: chrono::DateTime<Utc>,
process: Arc<Mutex<Option<ProcessHandle>>>,
events_tx: Option<mpsc::UnboundedSender<Event>>,
events: Option<mpsc::UnboundedReceiver<Event>>,
tasks: Vec<JoinHandle<()>>,
result: Arc<RwLock<Option<ClaudeResult>>>,
error: Arc<RwLock<Option<ClaudeError>>>,
_mcp_temp_file: Option<NamedTempFile>,
}
impl Session {
pub async fn new(config: SessionConfig, process: ProcessHandle) -> Result<Self> {
let id = if let Some(ref id) = config.explicit_session_id {
id.clone()
} else if let Some(ref id) = config.resume_session_id {
id.clone()
} else {
Uuid::new_v4().to_string()
};
let (events_tx, events) = match config.output_format {
OutputFormat::StreamingJson => {
let (tx, rx) = mpsc::unbounded_channel();
(Some(tx), Some(rx))
}
_ => (None, None),
};
let process = Arc::new(Mutex::new(Some(process)));
let result = Arc::new(RwLock::new(None));
let error = Arc::new(RwLock::new(None));
let mut session = Self {
id,
config: config.clone(),
start_time: Utc::now(),
process: process.clone(),
events_tx,
events,
tasks: Vec::new(),
result: result.clone(),
error: error.clone(),
_mcp_temp_file: None,
};
session.start_tasks().await?;
Ok(session)
}
async fn start_tasks(&mut self) -> Result<()> {
let process = self.process.clone();
let result = self.result.clone();
let error = self.error.clone();
match self.config.output_format {
OutputFormat::StreamingJson => {
let events_tx = self
.events_tx
.take()
.expect("events_tx must exist for StreamingJson output format");
let result_clone = result.clone();
let task = tokio::spawn(async move {
if let Err(e) =
Self::handle_streaming_json(process, events_tx, result_clone, error.clone())
.await
{
error.write().await.replace(e);
}
});
self.tasks.push(task);
}
OutputFormat::Json => {
let task = tokio::spawn(async move {
match Self::handle_json(process, error.clone()).await {
Ok(r) => {
result.write().await.replace(r);
}
Err(e) => {
error.write().await.replace(e);
}
}
});
self.tasks.push(task);
}
OutputFormat::Text => {
let task = tokio::spawn(async move {
match Self::handle_text(process, error.clone()).await {
Ok(r) => {
result.write().await.replace(r);
}
Err(e) => {
error.write().await.replace(e);
}
}
});
self.tasks.push(task);
}
}
Ok(())
}
async fn handle_streaming_json(
process: Arc<Mutex<Option<ProcessHandle>>>,
events_tx: mpsc::UnboundedSender<Event>,
result_arc: Arc<RwLock<Option<ClaudeResult>>>,
error: Arc<RwLock<Option<ClaudeError>>>,
) -> Result<()> {
let mut process_guard = process.lock().await;
let mut process = process_guard
.take()
.ok_or_else(|| ClaudeError::SessionError {
message: "Process already taken".to_string(),
})?;
let stdout = process
.take_stdout()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stdout reader".to_string(),
})?;
let stderr = process
.take_stderr()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stderr reader".to_string(),
})?;
let error_clone = error.clone();
tokio::spawn(async move {
let mut stderr_content = String::new();
let mut lines = stderr.lines();
while let Ok(Some(line)) = lines.next_line().await {
stderr_content.push_str(&line);
stderr_content.push('\n');
}
if !stderr_content.trim().is_empty() {
error_clone
.write()
.await
.replace(ClaudeError::ProcessFailed {
code: -1,
stderr: stderr_content,
});
}
});
let parser = JsonStreamParser::new(stdout);
let stream = parser.into_event_stream();
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(event) => {
if let Event::Result(ref result_event) = event {
let claude_result = ClaudeResult {
result_type: Some("result".to_string()),
subtype: None,
session_id: Some(result_event.session_id.clone()),
result: result_event.result.clone(),
content: result_event.result.clone(), is_error: result_event.is_error,
error: result_event.error.clone(),
total_cost_usd: result_event.total_cost_usd,
duration_ms: result_event.duration_ms,
duration_api_ms: result_event.duration_api_ms,
num_turns: result_event.num_turns,
exit_code: None,
usage: result_event.usage.clone(),
};
result_arc.write().await.replace(claude_result);
}
if events_tx.send(event).is_err() {
debug!("Event receiver dropped, stopping stream");
break;
}
}
Err(e) => {
warn!("Failed to parse JSON event: {}", e);
}
}
}
drop(events_tx);
let status = process.wait().await?;
if !status.success() {
let code = status.code().unwrap_or(-1);
if error.read().await.is_none() {
error.write().await.replace(ClaudeError::ProcessFailed {
code,
stderr: "Process exited with non-zero status".to_string(),
});
}
}
Ok(())
}
async fn handle_json(
process: Arc<Mutex<Option<ProcessHandle>>>,
_error: Arc<RwLock<Option<ClaudeError>>>,
) -> Result<ClaudeResult> {
let mut process_guard = process.lock().await;
let mut process = process_guard
.take()
.ok_or_else(|| ClaudeError::SessionError {
message: "Process already taken".to_string(),
})?;
let stdout = process
.take_stdout()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stdout reader".to_string(),
})?;
let stderr = process
.take_stderr()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stderr reader".to_string(),
})?;
let parser = SingleJsonParser::new(stdout, stderr);
let result = parser.parse().await?;
let status = process.wait().await?;
if !status.success() && !result.is_error {
return Err(ClaudeError::ProcessFailed {
code: status.code().unwrap_or(-1),
stderr: result.error.unwrap_or_default(),
});
}
Ok(result)
}
async fn handle_text(
process: Arc<Mutex<Option<ProcessHandle>>>,
_error: Arc<RwLock<Option<ClaudeError>>>,
) -> Result<ClaudeResult> {
let mut process_guard = process.lock().await;
let mut process = process_guard
.take()
.ok_or_else(|| ClaudeError::SessionError {
message: "Process already taken".to_string(),
})?;
let stdout = process
.take_stdout()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stdout reader".to_string(),
})?;
let stderr = process
.take_stderr()
.ok_or_else(|| ClaudeError::SessionError {
message: "No stderr reader".to_string(),
})?;
let parser = TextParser::new(stdout, stderr);
let result = parser.parse().await?;
let status = process.wait().await?;
if !status.success() && !result.is_error {
return Err(ClaudeError::ProcessFailed {
code: status.code().unwrap_or(-1),
stderr: result.error.unwrap_or_default(),
});
}
Ok(result)
}
pub async fn wait(mut self) -> Result<ClaudeResult> {
for task in self.tasks.drain(..) {
let _ = task.await;
}
if let Some(error) = self.error.write().await.take() {
return Err(error);
}
self.result
.read()
.await
.clone()
.ok_or_else(|| ClaudeError::SessionError {
message: "No result available".to_string(),
})
}
pub async fn kill(&mut self) -> Result<()> {
if let Some(mut process) = self.process.lock().await.take() {
process.kill().await?;
}
Ok(())
}
pub async fn interrupt(&mut self) -> Result<()> {
if let Some(process) = self.process.lock().await.as_mut()
&& let Some(pid) = process.id()
{
unsafe {
let result = libc::kill(pid as i32, libc::SIGINT);
if result == 0 {
return Ok(());
} else {
return Err(ClaudeError::SessionError {
message: format!(
"Failed to send interrupt signal: {}",
std::io::Error::last_os_error()
),
});
}
}
}
Err(ClaudeError::SessionError {
message: "Process not found or already terminated".to_string(),
})
}
pub fn id(&self) -> &str {
&self.id
}
pub fn start_time(&self) -> chrono::DateTime<Utc> {
self.start_time
}
pub async fn is_running(&self) -> bool {
if let Some(ref _process) = *self.process.lock().await {
true
} else {
false
}
}
pub fn take_event_stream(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
self.events.take()
}
pub fn set_mcp_temp_file(&mut self, temp_file: NamedTempFile) {
self._mcp_temp_file = Some(temp_file);
}
}
impl Drop for Session {
fn drop(&mut self) {
for task in &self.tasks {
task.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::SessionConfig;
use crate::error::ClaudeError;
use crate::types::OutputFormat;
#[tokio::test]
async fn wait_returns_processfailed_preserving_stderr() {
let cfg = SessionConfig::builder("test".to_string())
.output_format(OutputFormat::Text)
.build()
.unwrap();
let session = Session {
id: "test".into(),
config: cfg,
start_time: Utc::now(),
process: Arc::new(Mutex::new(None)),
events_tx: None,
events: None,
tasks: vec![],
result: Arc::new(RwLock::new(None)),
error: Arc::new(RwLock::new(Some(ClaudeError::ProcessFailed {
code: 1,
stderr: "stderr details".into(),
}))),
_mcp_temp_file: None,
};
let err = session.wait().await.unwrap_err();
match err {
ClaudeError::ProcessFailed { code, stderr } => {
assert_eq!(code, 1);
assert!(stderr.contains("stderr details"));
}
other => panic!("expected ProcessFailed, got {other:?}"),
}
}
#[tokio::test]
async fn wait_returns_sessionerror_preserving_message() {
let cfg = SessionConfig::builder("test".to_string())
.output_format(OutputFormat::Text)
.build()
.unwrap();
let session = Session {
id: "test".into(),
config: cfg,
start_time: Utc::now(),
process: Arc::new(Mutex::new(None)),
events_tx: None,
events: None,
tasks: vec![],
result: Arc::new(RwLock::new(None)),
error: Arc::new(RwLock::new(Some(ClaudeError::SessionError {
message: "custom session error".into(),
}))),
_mcp_temp_file: None,
};
let err = session.wait().await.unwrap_err();
match err {
ClaudeError::SessionError { message } => assert_eq!(message, "custom session error"),
other => panic!("expected SessionError, got {other:?}"),
}
}
#[tokio::test]
async fn wait_returns_ioerror_preserving_source() {
let cfg = SessionConfig::builder("test".to_string())
.output_format(OutputFormat::Text)
.build()
.unwrap();
let io = std::io::Error::other("disk full");
let session = Session {
id: "test".into(),
config: cfg,
start_time: Utc::now(),
process: Arc::new(Mutex::new(None)),
events_tx: None,
events: None,
tasks: vec![],
result: Arc::new(RwLock::new(None)),
error: Arc::new(RwLock::new(Some(io.into()))),
_mcp_temp_file: None,
};
let err = session.wait().await.unwrap_err();
match err {
ClaudeError::IoError { source } => {
assert_eq!(source.kind(), std::io::ErrorKind::Other);
assert!(source.to_string().contains("disk full"));
}
other => panic!("expected IoError, got {other:?}"),
}
}
#[tokio::test]
async fn wait_returns_no_result_available_when_result_and_error_missing() {
let cfg = SessionConfig::builder("test".to_string())
.output_format(OutputFormat::Text)
.build()
.unwrap();
let session = Session {
id: "test".into(),
config: cfg,
start_time: Utc::now(),
process: Arc::new(Mutex::new(None)),
events_tx: None,
events: None,
tasks: vec![],
result: Arc::new(RwLock::new(None)),
error: Arc::new(RwLock::new(None)),
_mcp_temp_file: None,
};
let err = session.wait().await.unwrap_err();
match err {
ClaudeError::SessionError { message } => assert_eq!(message, "No result available"),
other => panic!("expected SessionError, got {other:?}"),
}
}
}