use crate::{Result, MitoxideError, Context, Router};
use mitoxide_ssh::{Transport, StdioTransport, SshConfig, ConnectionInfo};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct SessionConfig {
pub ssh_config: SshConfig,
pub agent_config: AgentConfig,
pub timeout: Duration,
pub max_streams: u32,
pub bootstrap_agent: bool,
}
#[derive(Debug, Clone)]
pub struct AgentConfig {
pub binary_path: Option<PathBuf>,
pub execution_timeout: Duration,
pub verify_hash: bool,
pub verify_signature: bool,
}
impl Default for AgentConfig {
fn default() -> Self {
Self {
binary_path: None,
execution_timeout: Duration::from_secs(300),
verify_hash: false,
verify_signature: false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SessionStatus {
Connecting,
Bootstrapping,
Active,
Disconnected,
Error(String),
}
#[derive(Debug, Clone)]
pub struct SessionState {
pub id: Uuid,
pub target: String,
pub status: SessionStatus,
pub agent_version: Option<String>,
pub capabilities: Vec<String>,
pub connection_info: Option<ConnectionInfo>,
}
pub struct SessionBuilder {
target: String,
ssh_config: SshConfig,
agent_config: AgentConfig,
timeout: Duration,
max_streams: u32,
bootstrap_agent: bool,
}
impl SessionBuilder {
pub fn new(target: String) -> Self {
let (username, host, port) = Self::parse_target(&target);
let ssh_config = SshConfig {
host,
port,
username,
..Default::default()
};
Self {
target,
ssh_config,
agent_config: AgentConfig::default(),
timeout: Duration::from_secs(30),
max_streams: 100,
bootstrap_agent: true,
}
}
fn parse_target(target: &str) -> (String, String, u16) {
let mut username = "root".to_string();
let mut host = target.to_string();
let mut port = 22;
if let Some(at_pos) = target.find('@') {
username = target[..at_pos].to_string();
host = target[at_pos + 1..].to_string();
}
if let Some(colon_pos) = host.rfind(':') {
if let Ok(parsed_port) = host[colon_pos + 1..].parse::<u16>() {
port = parsed_port;
host = host[..colon_pos].to_string();
}
}
(username, host, port)
}
pub fn with_key(mut self, key_path: PathBuf) -> Self {
self.ssh_config.key_path = Some(key_path);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self.ssh_config.connect_timeout = timeout.as_secs();
self
}
pub fn with_ssh_option(mut self, key: String, value: String) -> Self {
self.ssh_config.options.insert(key, value);
self
}
pub fn with_agent_binary(mut self, path: PathBuf) -> Self {
self.agent_config.binary_path = Some(path);
self
}
pub fn with_bootstrap(mut self, bootstrap: bool) -> Self {
self.bootstrap_agent = bootstrap;
self
}
pub fn with_max_streams(mut self, max_streams: u32) -> Self {
self.max_streams = max_streams;
self
}
pub fn with_hash_verification(mut self, verify: bool) -> Self {
self.agent_config.verify_hash = verify;
self
}
pub fn build_config(self) -> SessionConfig {
SessionConfig {
ssh_config: self.ssh_config,
agent_config: self.agent_config,
timeout: self.timeout,
max_streams: self.max_streams,
bootstrap_agent: self.bootstrap_agent,
}
}
pub async fn connect(self) -> Result<ConnectedSession> {
let target = self.target.clone();
let config = self.build_config();
let session = Session::new(target, config);
session.connect().await
}
}
pub struct ConnectedSession {
state: Arc<RwLock<SessionState>>,
router: Arc<Router>,
config: SessionConfig,
shutdown_tx: mpsc::Sender<()>,
}
impl ConnectedSession {
pub(crate) fn new(
state: SessionState,
router: Router,
config: SessionConfig,
shutdown_tx: mpsc::Sender<()>,
) -> Self {
Self {
state: Arc::new(RwLock::new(state)),
router: Arc::new(router),
config,
shutdown_tx,
}
}
pub async fn state(&self) -> SessionState {
self.state.read().await.clone()
}
pub async fn id(&self) -> Uuid {
self.state.read().await.id
}
pub async fn context(&self) -> Result<Context> {
let state = self.state.read().await;
if state.status != SessionStatus::Active {
return Err(MitoxideError::Protocol(
format!("Session not active: {:?}", state.status)
));
}
Context::new(state.id, self.router.clone())
}
pub async fn ping(&self) -> Result<Duration> {
let context = self.context().await?;
context.ping().await
}
pub async fn connection_info(&self) -> Option<ConnectionInfo> {
self.state.read().await.connection_info.clone()
}
pub async fn disconnect(self) -> Result<()> {
info!("Disconnecting session {}", self.id().await);
{
let mut state = self.state.write().await;
state.status = SessionStatus::Disconnected;
}
if let Err(e) = self.shutdown_tx.send(()).await {
warn!("Failed to send shutdown signal: {}", e);
}
self.router.shutdown().await?;
info!("Session disconnected successfully");
Ok(())
}
}
impl Drop for ConnectedSession {
fn drop(&mut self) {
let _ = self.shutdown_tx.try_send(());
}
}
pub struct Session {
target: String,
config: SessionConfig,
}
impl Session {
pub async fn ssh(target: &str) -> Result<SessionBuilder> {
debug!("Creating SSH session builder for target: {}", target);
Ok(SessionBuilder::new(target.to_string()))
}
pub fn new(target: String, config: SessionConfig) -> Self {
Self { target, config }
}
pub async fn connect(self) -> Result<ConnectedSession> {
info!("Connecting to target: {}", self.target);
let session_id = Uuid::new_v4();
let mut state = SessionState {
id: session_id,
target: self.target.clone(),
status: SessionStatus::Connecting,
agent_version: None,
capabilities: Vec::new(),
connection_info: None,
};
let mut transport = StdioTransport::new(self.config.ssh_config.clone());
transport.test_connection().await
.map_err(|e| MitoxideError::Transport(format!("Connection test failed: {}", e)))?;
let connection = transport.connect().await
.map_err(|e| MitoxideError::Transport(format!("Failed to connect: {}", e)))?;
state.connection_info = Some(transport.connection_info());
if self.config.bootstrap_agent {
state.status = SessionStatus::Bootstrapping;
let agent_binary = self.get_agent_binary().await?;
transport.bootstrap_agent(&agent_binary).await
.map_err(|e| MitoxideError::Agent(format!("Agent bootstrap failed: {}", e)))?;
info!("Agent bootstrapped successfully");
}
let (router, shutdown_tx) = Router::new(
connection,
self.config.max_streams,
self.config.timeout,
).await?;
state.status = SessionStatus::Active;
state.capabilities = vec![
"process_exec".to_string(),
"file_ops".to_string(),
];
#[cfg(feature = "wasm")]
{
state.capabilities.push("wasm_exec".to_string());
}
info!("Session {} established successfully", session_id);
Ok(ConnectedSession::new(state, router, self.config, shutdown_tx))
}
async fn get_agent_binary(&self) -> Result<Vec<u8>> {
if let Some(binary_path) = &self.config.agent_config.binary_path {
tokio::fs::read(binary_path).await
.map_err(|e| MitoxideError::Agent(format!("Failed to read agent binary: {}", e)))
} else {
Ok(b"#!/bin/bash\necho 'Mock agent binary'\n".to_vec())
}
}
}
#[cfg(test)]
mod tests;