use anyhow::{Context, Result};
use perl_content_length_framing::{ContentLengthFramer, frame};
use serde_json::Value;
use std::io::{BufReader, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
const MAX_TIMEOUT_MS: u32 = 300_000;
const DEFAULT_TIMEOUT_MS: u32 = 5000;
#[derive(Debug, Clone)]
pub struct TcpAttachConfig {
pub host: String,
pub port: u16,
pub timeout_ms: Option<u32>,
}
impl TcpAttachConfig {
pub fn new(host: String, port: u16) -> Self {
Self { host, port, timeout_ms: None }
}
pub fn with_timeout(mut self, timeout_ms: u32) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}
pub fn validate(&self) -> Result<()> {
if self.host.trim().is_empty() {
anyhow::bail!("Host cannot be empty");
}
if self.port == 0 {
anyhow::bail!("Port must be in range 1-65535");
}
if let Some(timeout) = self.timeout_ms {
if timeout == 0 {
anyhow::bail!("Timeout must be greater than 0 milliseconds");
}
if timeout > MAX_TIMEOUT_MS {
anyhow::bail!("Timeout cannot exceed {} milliseconds (5 minutes)", MAX_TIMEOUT_MS);
}
}
Ok(())
}
pub fn timeout_duration(&self) -> Duration {
Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS) as u64)
}
}
pub struct TcpAttachSession {
stream: Option<TcpStream>,
connected: Arc<Mutex<bool>>,
event_sender: Option<Sender<DapEvent>>,
}
#[derive(Debug, Clone)]
pub enum DapEvent {
Output { category: String, output: String },
Stopped { reason: String, thread_id: i32 },
Continued { thread_id: i32 },
Terminated { reason: String },
Error { message: String },
}
impl TcpAttachSession {
pub fn new() -> Self {
Self { stream: None, connected: Arc::new(Mutex::new(false)), event_sender: None }
}
pub fn set_event_sender(&mut self, sender: Sender<DapEvent>) {
self.event_sender = Some(sender);
}
pub fn connect(&mut self, config: &TcpAttachConfig) -> Result<()> {
config.validate()?;
let address = format!("{}:{}", config.host, config.port);
tracing::info!(address, "Connecting to Perl debugger");
let stream = TcpStream::connect_timeout(&address.parse()?, config.timeout_duration())
.context(format!("Failed to connect to {}", address))?;
let timeout = Some(config.timeout_duration());
stream.set_read_timeout(timeout)?;
stream.set_write_timeout(timeout)?;
self.stream = Some(stream);
*self.connected.lock().unwrap_or_else(|e| e.into_inner()) = true;
tracing::info!(address, "Successfully connected to Perl debugger");
Ok(())
}
pub fn is_connected(&self) -> bool {
self.connected.lock().map(|g| *g).unwrap_or(false)
}
pub fn disconnect(&mut self) -> Result<()> {
if let Some(stream) = self.stream.take() {
stream.shutdown(std::net::Shutdown::Both)?;
*self.connected.lock().unwrap_or_else(|e| e.into_inner()) = false;
tracing::info!("Disconnected from Perl debugger");
}
Ok(())
}
pub fn send_message(&mut self, message: &str) -> Result<()> {
let stream = self.stream.as_mut().context("Not connected to debugger")?;
let framed = frame(message.as_bytes());
stream.write_all(&framed).context("Failed to write to debugger")?;
stream.flush().context("Failed to flush stream")?;
Ok(())
}
pub fn start_reader(&mut self) -> Result<()> {
let stream = self.stream.take().context("No stream available")?;
let connected = Arc::clone(&self.connected);
let event_sender = self.event_sender.clone();
thread::spawn(move || {
let mut reader = BufReader::new(stream);
let mut framer = ContentLengthFramer::new();
let mut read_buf = [0u8; 8 * 1024];
loop {
let bytes_read = match reader.read(&mut read_buf) {
Ok(0) => {
tracing::debug!("TCP connection closed by debugger");
*connected.lock().unwrap_or_else(|e| e.into_inner()) = false;
if let Some(ref sender) = event_sender {
let _ = sender.send(DapEvent::Terminated {
reason: "connection_closed".to_string(),
});
}
return;
}
Ok(n) => n,
Err(e) => {
tracing::error!(error = %e, "Error reading from TCP");
*connected.lock().unwrap_or_else(|e| e.into_inner()) = false;
if let Some(ref sender) = event_sender {
let _ = sender.send(DapEvent::Error {
message: format!("TCP read error: {}", e),
});
}
return;
}
};
framer.push(&read_buf[..bytes_read]);
loop {
let buffer = match framer.try_next() {
Ok(Some(buffer)) => buffer,
Ok(None) => break,
Err(error) => {
tracing::warn!(%error, "Failed to parse TCP DAP frame");
continue;
}
};
if let Ok(text) = std::str::from_utf8(&buffer) {
tracing::trace!(output = %text, "Received from debugger");
} else {
tracing::warn!(
bytes = buffer.len(),
"Received non-UTF8 message from debugger"
);
}
if let Some(ref sender) = event_sender
&& let Ok(value) = serde_json::from_slice::<Value>(&buffer)
&& let Some(event_type) = value.get("type").and_then(|t| t.as_str())
&& event_type == "event"
{
let event_name =
value.get("event").and_then(|e| e.as_str()).unwrap_or("unknown");
match event_name {
"output" => {
let body = value.get("body");
let category = body
.and_then(|b| b.get("category"))
.and_then(|c| c.as_str())
.unwrap_or("stdout")
.to_string();
let output = body
.and_then(|b| b.get("output"))
.and_then(|o| o.as_str())
.unwrap_or("")
.to_string();
let _ = sender.send(DapEvent::Output { category, output });
}
"stopped" => {
let body = value.get("body");
let reason = body
.and_then(|b| b.get("reason"))
.and_then(|r| r.as_str())
.unwrap_or("unknown")
.to_string();
let thread_id =
body.and_then(|b| b.get("threadId"))
.and_then(|t| t.as_i64())
.unwrap_or(1) as i32;
let _ = sender.send(DapEvent::Stopped { reason, thread_id });
}
"continued" => {
let body = value.get("body");
let thread_id =
body.and_then(|b| b.get("threadId"))
.and_then(|t| t.as_i64())
.unwrap_or(1) as i32;
let _ = sender.send(DapEvent::Continued { thread_id });
}
"terminated" => {
let reason = value
.get("body")
.and_then(|b| b.get("reason"))
.and_then(|r| r.as_str())
.unwrap_or("unknown")
.to_string();
let _ = sender.send(DapEvent::Terminated { reason });
}
_ => {
tracing::debug!(event = %event_name, "Unhandled DAP event");
}
}
}
}
}
});
Ok(())
}
}
impl Default for TcpAttachSession {
fn default() -> Self {
Self::new()
}
}
impl Drop for TcpAttachSession {
fn drop(&mut self) {
let _ = self.disconnect();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tcp_attach_config_validation() {
let config = TcpAttachConfig::new("localhost".to_string(), 13603);
assert!(config.validate().is_ok());
let config = TcpAttachConfig::new("".to_string(), 13603);
assert!(config.validate().is_err());
let config = TcpAttachConfig::new("localhost".to_string(), 0);
assert!(config.validate().is_err());
let config = TcpAttachConfig::new("localhost".to_string(), 13603).with_timeout(5000);
assert!(config.validate().is_ok());
let config = TcpAttachConfig::new("localhost".to_string(), 13603).with_timeout(0);
assert!(config.validate().is_err());
let config =
TcpAttachConfig::new("localhost".to_string(), 13603).with_timeout(MAX_TIMEOUT_MS + 1);
assert!(config.validate().is_err());
}
#[test]
fn test_tcp_attach_session_creation() {
let session = TcpAttachSession::new();
assert!(!session.is_connected());
}
#[test]
fn test_tcp_attach_timeout_duration() {
let config = TcpAttachConfig::new("localhost".to_string(), 13603);
assert_eq!(config.timeout_duration(), Duration::from_millis(DEFAULT_TIMEOUT_MS as u64));
let config = TcpAttachConfig::new("localhost".to_string(), 13603).with_timeout(10000);
assert_eq!(config.timeout_duration(), Duration::from_millis(10000));
}
}