use async_trait::async_trait;
use futures::stream::Stream;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::errors::{ClaudeError, Result};
use crate::internal::pool::{get_global_pool, init_global_pool, ConnectionPool, PoolConfig, WorkerGuard};
use super::Transport;
pub struct PooledTransport {
pool: Arc<ConnectionPool>,
guard: Option<WorkerGuard>,
#[allow(dead_code)]
options: crate::types::config::ClaudeAgentOptions,
ready: bool,
}
impl PooledTransport {
#[allow(dead_code)]
pub fn new(
pool_config: PoolConfig,
options: crate::types::config::ClaudeAgentOptions,
) -> Self {
let pool = Arc::new(ConnectionPool::new(pool_config, options.clone()));
Self {
pool,
guard: None,
options,
ready: false,
}
}
pub fn from_pool(pool: Arc<ConnectionPool>, options: crate::types::config::ClaudeAgentOptions) -> Self {
Self {
pool,
guard: None,
options,
ready: false,
}
}
#[allow(dead_code)]
pub fn take_stdout(&mut self) -> Option<Arc<Mutex<tokio::io::BufReader<tokio::process::ChildStdout>>>> {
self.guard.as_ref().and_then(|g| g.stdout())
}
}
#[async_trait]
impl Transport for PooledTransport {
async fn connect(&mut self) -> Result<()> {
if self.ready {
return Ok(());
}
if !self.pool.is_enabled() {
return Err(ClaudeError::Connection(crate::errors::ConnectionError::new(
"Connection pool is not enabled".to_string(),
)));
}
let guard = self.pool.acquire().await?;
self.guard = Some(guard);
self.ready = true;
Ok(())
}
async fn write(&mut self, data: &str) -> Result<()> {
let guard = self.guard.as_mut().ok_or_else(|| {
ClaudeError::Transport("Transport not connected".to_string())
})?;
guard.write(data).await
}
fn read_messages(
&mut self,
) -> Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
let guard = match self.guard.as_mut() {
Some(g) => g,
None => {
yield Err(ClaudeError::Transport("Transport not connected".to_string()));
break;
}
};
let mut line = String::new();
match guard.read_line(&mut line).await {
Ok(0) => {
break;
}
Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<serde_json::Value>(trimmed) {
Ok(json) => {
yield Ok(json);
}
Err(e) => {
yield Err(ClaudeError::Transport(format!(
"Failed to parse JSON: {}",
e
)));
}
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
fn read_raw_messages(
&mut self,
) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
let guard = match self.guard.as_mut() {
Some(g) => g,
None => {
yield Err(ClaudeError::Transport("Transport not connected".to_string()));
break;
}
};
let mut line = String::new();
match guard.read_line(&mut line).await {
Ok(0) => {
break;
}
Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
yield Ok(trimmed.to_string());
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
async fn close(&mut self) -> Result<()> {
self.guard = None;
self.ready = false;
Ok(())
}
fn is_ready(&self) -> bool {
self.ready
}
async fn end_input(&mut self) -> Result<()> {
Ok(())
}
}
#[allow(dead_code)]
pub async fn init_pool(config: PoolConfig, options: crate::types::config::ClaudeAgentOptions) -> Result<()> {
init_global_pool(config, options).await
}
#[allow(dead_code)]
pub async fn get_pool() -> Option<Arc<ConnectionPool>> {
get_global_pool().await
}