pub mod connection;
pub mod registry;
pub mod terminal;
pub mod types;
use crate::operations::lookup_target;
use crate::service_error::ServiceError;
use connection::SshConnection;
use once_cell::sync::Lazy;
use registry::{ShellSession, ShellSessionRegistry};
use std::sync::Arc;
use tracing::info;
use types::*;
static GLOBAL_SESSION_REGISTRY: Lazy<Arc<ShellSessionRegistry>> =
Lazy::new(|| Arc::new(ShellSessionRegistry::new()));
pub struct ShellSessionService {
registry: Arc<ShellSessionRegistry>,
}
impl ShellSessionService {
pub fn new() -> Self {
Self {
registry: Arc::new(ShellSessionRegistry::new()),
}
}
pub fn global() -> Self {
Self {
registry: Arc::clone(&GLOBAL_SESSION_REGISTRY),
}
}
pub async fn create(&self, args: ShellSessionCreateArgs) -> Result<ShellSessionCreateResult, ServiceError> {
create_session_impl(&self.registry, args).await
}
pub async fn write(&self, args: ShellSessionWriteArgs) -> Result<ShellSessionWriteResult, ServiceError> {
write_session_impl(&self.registry, args).await
}
pub async fn read(&self, args: ShellSessionReadArgs) -> Result<ShellSessionReadResult, ServiceError> {
read_session_impl(&self.registry, args).await
}
pub async fn list(&self, args: ShellSessionListArgs) -> Result<ShellSessionListResult, ServiceError> {
list_sessions_impl(&self.registry, args).await
}
pub async fn reconnect(&self, args: ShellSessionReconnectArgs) -> Result<ShellSessionReconnectResult, ServiceError> {
reconnect_session_impl(&self.registry, args).await
}
pub async fn resize(&self, args: ShellSessionResizeArgs) -> Result<ShellSessionResizeResult, ServiceError> {
resize_session_impl(&self.registry, args).await
}
pub async fn close(&self, args: ShellSessionCloseArgs) -> Result<ShellSessionCloseResult, ServiceError> {
close_session_impl(&self.registry, args).await
}
}
impl Default for ShellSessionService {
fn default() -> Self {
Self::new()
}
}
pub async fn shell_session_create(args: ShellSessionCreateArgs) -> Result<ShellSessionCreateResult, ServiceError> {
create_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_write(args: ShellSessionWriteArgs) -> Result<ShellSessionWriteResult, ServiceError> {
write_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_read(args: ShellSessionReadArgs) -> Result<ShellSessionReadResult, ServiceError> {
read_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_list(args: ShellSessionListArgs) -> Result<ShellSessionListResult, ServiceError> {
list_sessions_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_reconnect(args: ShellSessionReconnectArgs) -> Result<ShellSessionReconnectResult, ServiceError> {
reconnect_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_resize(args: ShellSessionResizeArgs) -> Result<ShellSessionResizeResult, ServiceError> {
resize_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
pub async fn shell_session_close(args: ShellSessionCloseArgs) -> Result<ShellSessionCloseResult, ServiceError> {
close_session_impl(&GLOBAL_SESSION_REGISTRY, args).await
}
async fn create_session_impl(
registry: &ShellSessionRegistry,
args: ShellSessionCreateArgs,
) -> Result<ShellSessionCreateResult, ServiceError> {
let target = lookup_target(&args.target_id).await?;
let session_id = uuid::Uuid::new_v4().to_string();
let tmux_session = format!("cnctd-ssh-{}", &session_id[..8]);
info!(
"Creating shell session {} for target {} (tmux: {})",
session_id, args.target_id, tmux_session
);
let shell_cmd = args.shell.as_deref();
let connection = SshConnection::connect(
&target.host,
target.port,
&target.user,
&target.key_path,
target.key_passphrase.as_deref(),
args.cols,
args.rows,
shell_cmd,
)
.await?;
let mut session = ShellSession::new(
session_id.clone(),
args.target_id.clone(),
args.name.clone(),
args.client_id.clone(),
tmux_session,
args.cols,
args.rows,
connection,
);
session.start_output_reader();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let screen = session.screen_state().await;
let info = session.info().await;
registry.add(session).await;
Ok(ShellSessionCreateResult {
session_id,
info,
screen,
})
}
async fn write_session_impl(
registry: &ShellSessionRegistry,
args: ShellSessionWriteArgs,
) -> Result<ShellSessionWriteResult, ServiceError> {
let session_lock = registry
.get(&args.session_id)
.await
.ok_or_else(|| ServiceError::NotFound(format!("Session not found: {}", args.session_id)))?;
let session = session_lock.read().await;
let mut data = process_escape_sequences(&args.input);
if args.newline {
data.push(b'\n');
}
let bytes_sent = session.write(&data).await?;
Ok(ShellSessionWriteResult {
session_id: args.session_id,
bytes_sent,
})
}
async fn read_session_impl(
registry: &ShellSessionRegistry,
args: ShellSessionReadArgs,
) -> Result<ShellSessionReadResult, ServiceError> {
let session_lock = registry
.get(&args.session_id)
.await
.ok_or_else(|| ServiceError::NotFound(format!("Session not found: {}", args.session_id)))?;
let session = session_lock.read().await;
let timeout_ms = if args.wait_ms > 0 { args.wait_ms } else { 30000 };
let pattern_matched = if let Some(ref pattern) = args.wait_for_pattern {
Some(session.wait_for_pattern(pattern, timeout_ms).await)
} else {
None
};
let stabilized = if let Some(stable_ms) = args.wait_for_stable_ms {
if args.wait_for_pattern.is_none() || pattern_matched == Some(true) {
Some(session.wait_for_stable(stable_ms, timeout_ms).await)
} else {
Some(false)
}
} else {
None
};
if args.wait_for_pattern.is_none() && args.wait_for_stable_ms.is_none() && args.wait_ms > 0 {
session.wait_for_output(args.wait_ms, args.min_bytes).await;
}
let state = *session.state.read().await;
let (raw, screen, buffer_size, truncated) = match args.format {
OutputFormat::Raw => {
let (text, remaining, truncated) = session.read(args.consume).await;
(Some(text), None, remaining, truncated)
}
OutputFormat::Stripped => {
let (text, remaining, truncated) = session.read(args.consume).await;
let stripped = strip_ansi_codes(&text);
(Some(stripped), None, remaining, truncated)
}
OutputFormat::Screen => {
let screen = session.screen_state().await;
let (_, remaining, truncated) = session.read(false).await;
(None, Some(screen), remaining, truncated)
}
OutputFormat::Both => {
let (text, remaining, truncated) = session.read(args.consume).await;
let screen = session.screen_state().await;
(Some(text), Some(screen), remaining, truncated)
}
};
Ok(ShellSessionReadResult {
session_id: args.session_id,
raw,
screen,
buffer_size,
truncated,
state,
pattern_matched,
stabilized,
})
}
async fn list_sessions_impl(
registry: &ShellSessionRegistry,
args: ShellSessionListArgs,
) -> Result<ShellSessionListResult, ServiceError> {
let sessions = registry
.list(
args.target_id.as_deref(),
args.client_id.as_deref(),
args.include_disconnected,
)
.await;
Ok(ShellSessionListResult { sessions })
}
async fn reconnect_session_impl(
_registry: &ShellSessionRegistry,
_args: ShellSessionReconnectArgs,
) -> Result<ShellSessionReconnectResult, ServiceError> {
Err(ServiceError::Internal(
"Reconnection not yet implemented - coming soon".to_string(),
))
}
async fn resize_session_impl(
registry: &ShellSessionRegistry,
args: ShellSessionResizeArgs,
) -> Result<ShellSessionResizeResult, ServiceError> {
let session_lock = registry
.get(&args.session_id)
.await
.ok_or_else(|| ServiceError::NotFound(format!("Session not found: {}", args.session_id)))?;
let mut session = session_lock.write().await;
session.resize(args.cols, args.rows).await?;
Ok(ShellSessionResizeResult {
session_id: args.session_id,
size: (args.cols, args.rows),
})
}
async fn close_session_impl(
registry: &ShellSessionRegistry,
args: ShellSessionCloseArgs,
) -> Result<ShellSessionCloseResult, ServiceError> {
let session_lock = registry
.remove(&args.session_id)
.await
.ok_or_else(|| ServiceError::NotFound(format!("Session not found: {}", args.session_id)))?;
let session = match Arc::try_unwrap(session_lock) {
Ok(rwlock) => rwlock.into_inner(),
Err(_) => {
return Err(ServiceError::Internal(
"Session is still in use".to_string(),
))
}
};
let closed = session.close(args.force).await?;
Ok(ShellSessionCloseResult {
session_id: args.session_id,
closed,
})
}
fn process_escape_sequences(input: &str) -> Vec<u8> {
let mut result = Vec::with_capacity(input.len());
let mut chars = input.chars().peekable();
while let Some(c) = chars.next() {
if c == '\\' {
match chars.peek() {
Some('x') => {
chars.next(); let hex: String = chars.by_ref().take(2).collect();
if let Ok(byte) = u8::from_str_radix(&hex, 16) {
result.push(byte);
} else {
result.extend_from_slice(b"\\x");
result.extend_from_slice(hex.as_bytes());
}
}
Some('n') => {
chars.next();
result.push(b'\n');
}
Some('r') => {
chars.next();
result.push(b'\r');
}
Some('t') => {
chars.next();
result.push(b'\t');
}
Some('\\') => {
chars.next();
result.push(b'\\');
}
Some('0') => {
chars.next();
result.push(0);
}
_ => {
result.push(b'\\');
}
}
} else {
let mut buf = [0u8; 4];
let s = c.encode_utf8(&mut buf);
result.extend_from_slice(s.as_bytes());
}
}
result
}
fn strip_ansi_codes(input: &str) -> String {
let mut result = String::with_capacity(input.len());
let mut chars = input.chars().peekable();
while let Some(c) = chars.next() {
if c == '\x1b' {
if chars.peek() == Some(&'[') {
chars.next(); while let Some(&ch) = chars.peek() {
chars.next();
if ch.is_ascii_alphabetic() || ch == '~' {
break;
}
}
} else if chars.peek() == Some(&']') {
chars.next(); while let Some(&ch) = chars.peek() {
chars.next();
if ch == '\x07' {
break; }
if ch == '\x1b' {
if chars.peek() == Some(&'\\') {
chars.next(); break;
}
}
}
} else {
chars.next();
}
} else if c == '\x0f' || c == '\x0e' {
} else if c.is_control() && c != '\n' && c != '\r' && c != '\t' {
} else {
result.push(c);
}
}
result
}
pub fn get_shell_session_tool_definitions() -> Vec<crate::operations::ToolDefinition> {
use crate::operations::ToolDefinition;
use schemars::schema_for;
vec![
ToolDefinition {
name: "shell_session_create".to_string(),
description: "Create a new interactive shell session on a registered SSH target. Sessions are persistent and survive disconnections.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionCreateArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_write".to_string(),
description: "Send input to an interactive shell session. Use for commands, keystrokes (\\x03 for Ctrl+C), or any terminal input.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionWriteArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_read".to_string(),
description: "Read output from an interactive shell session. Supports raw output, screen state (for TUI apps), or both.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionReadArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_list".to_string(),
description: "List interactive shell sessions. Can filter by target or client ID.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionListArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_reconnect".to_string(),
description: "Reconnect to a disconnected shell session. The session must still exist on the remote server.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionReconnectArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_resize".to_string(),
description: "Resize a shell session's terminal dimensions. Important for TUI applications.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionResizeArgs)).unwrap_or_default(),
},
ToolDefinition {
name: "shell_session_close".to_string(),
description: "Close an interactive shell session. Terminates the remote shell.".to_string(),
input_schema: serde_json::to_value(schema_for!(ShellSessionCloseArgs)).unwrap_or_default(),
},
]
}