use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::time::Duration;
#[cfg(test)]
use super::error::{Flow, signal};
use super::value::Value;
#[cfg(test)]
use super::value::ValueKind;
use crate::heap_types::LispString;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionType {
Plain,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NetworkStatus {
Open,
Closed,
Failed(String),
Connecting,
}
pub struct NetworkStream {
pub id: u64,
pub name: Value,
pub host: String,
pub port: u16,
pub stream: Option<TcpStream>,
pub buffer_name: Value,
pub filter: Value,
pub sentinel: Value,
pub status: NetworkStatus,
pub output_buffer: Vec<u8>,
pub coding_system: Value,
pub conn_type: ConnectionType,
}
pub struct ProcessFilter {
pub function: Value,
pub output_buffer: LispString,
}
pub struct ProcessSentinel {
pub function: Value,
}
pub struct NetworkManager {
connections: HashMap<u64, NetworkStream>,
next_id: u64,
process_filters: HashMap<u64, ProcessFilter>,
process_sentinels: HashMap<u64, ProcessSentinel>,
}
impl Default for NetworkManager {
fn default() -> Self {
Self::new()
}
}
impl NetworkManager {
pub fn new() -> Self {
Self {
connections: HashMap::new(),
next_id: 1,
process_filters: HashMap::new(),
process_sentinels: HashMap::new(),
}
}
pub fn open_connection(
&mut self,
name: &str,
host: &str,
port: u16,
buffer: Option<&str>,
) -> Result<u64, String> {
let addr_str = format!("{}:{}", host, port);
let addrs: Vec<_> = addr_str
.to_socket_addrs()
.map_err(|e| format!("DNS resolution failed for {}: {}", addr_str, e))?
.collect();
if addrs.is_empty() {
return Err(format!("No addresses found for {}", addr_str));
}
let stream = TcpStream::connect_timeout(&addrs[0], Duration::from_secs(30))
.map_err(|e| format!("Connection to {} failed: {}", addr_str, e))?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let id = self.next_id;
self.next_id += 1;
let conn = NetworkStream {
id,
name: Value::string(name),
host: host.to_string(),
port,
stream: Some(stream),
buffer_name: buffer.map(Value::string).unwrap_or(Value::NIL),
filter: Value::NIL,
sentinel: Value::NIL,
status: NetworkStatus::Open,
output_buffer: Vec::new(),
coding_system: Value::symbol("utf-8"),
conn_type: ConnectionType::Plain,
};
self.connections.insert(id, conn);
Ok(id)
}
pub fn close_connection(&mut self, id: u64) -> bool {
if let Some(conn) = self.connections.get_mut(&id) {
conn.stream = None;
conn.status = NetworkStatus::Closed;
true
} else {
false
}
}
pub fn send_data(&mut self, id: u64, data: &[u8]) -> Result<usize, String> {
let conn = self
.connections
.get_mut(&id)
.ok_or_else(|| format!("No connection with id {}", id))?;
match conn.status {
NetworkStatus::Open => {}
ref status => {
return Err(format!(
"Connection {} is not open (status: {:?})",
id, status
));
}
}
let stream = conn
.stream
.as_mut()
.ok_or_else(|| format!("Connection {} has no underlying stream", id))?;
let n = stream
.write(data)
.map_err(|e| format!("Write error on connection {}: {}", id, e))?;
stream
.flush()
.map_err(|e| format!("Flush error on connection {}: {}", id, e))?;
Ok(n)
}
pub fn receive_data(&mut self, id: u64, timeout: Option<Duration>) -> Result<Vec<u8>, String> {
let conn = self
.connections
.get_mut(&id)
.ok_or_else(|| format!("No connection with id {}", id))?;
match conn.status {
NetworkStatus::Open => {}
ref status => {
return Err(format!(
"Connection {} is not open (status: {:?})",
id, status
));
}
}
let stream = conn
.stream
.as_mut()
.ok_or_else(|| format!("Connection {} has no underlying stream", id))?;
if let Some(dur) = timeout {
let _ = stream.set_read_timeout(Some(dur));
}
let mut buf = vec![0u8; 4096];
match stream.read(&mut buf) {
Ok(0) => {
conn.status = NetworkStatus::Closed;
Ok(Vec::new())
}
Ok(n) => {
buf.truncate(n);
conn.output_buffer.extend_from_slice(&buf);
Ok(buf)
}
Err(ref e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
Ok(Vec::new())
}
Err(e) => {
conn.status = NetworkStatus::Failed(e.to_string());
Err(format!("Read error on connection {}: {}", id, e))
}
}
}
pub fn connection_status(&self, id: u64) -> Option<&NetworkStatus> {
self.connections.get(&id).map(|c| &c.status)
}
pub fn get_connection(&self, id: u64) -> Option<&NetworkStream> {
self.connections.get(&id)
}
pub fn list_connections(&self) -> Vec<(u64, Value, &str, u16)> {
self.connections
.values()
.map(|c| (c.id, c.name, c.host.as_str(), c.port))
.collect()
}
pub fn delete_connection(&mut self, id: u64) -> bool {
if let Some(mut conn) = self.connections.remove(&id) {
conn.stream = None;
self.process_filters.remove(&id);
self.process_sentinels.remove(&id);
true
} else {
false
}
}
pub fn set_process_filter(&mut self, process_id: u64, filter: Value) {
self.process_filters.insert(
process_id,
ProcessFilter {
function: filter,
output_buffer: LispString::from_utf8(""),
},
);
}
pub fn set_process_sentinel(&mut self, process_id: u64, sentinel: Value) {
self.process_sentinels
.insert(process_id, ProcessSentinel { function: sentinel });
}
pub fn get_process_filter(&self, process_id: u64) -> Option<Value> {
self.process_filters.get(&process_id).map(|f| f.function)
}
pub fn get_process_sentinel(&self, process_id: u64) -> Option<Value> {
self.process_sentinels.get(&process_id).map(|s| s.function)
}
pub fn remove_process_filter(&mut self, process_id: u64) {
self.process_filters.remove(&process_id);
}
pub fn remove_process_sentinel(&mut self, process_id: u64) {
self.process_sentinels.remove(&process_id);
}
pub fn accept_process_output(
&mut self,
id: u64,
timeout: Option<Duration>,
) -> Result<String, String> {
if timeout.is_some() {
let _ = self.receive_data(id, timeout);
}
let conn = self
.connections
.get_mut(&id)
.ok_or_else(|| format!("No connection with id {}", id))?;
let data = std::mem::take(&mut conn.output_buffer);
Ok(String::from_utf8_lossy(&data).into_owned())
}
pub fn process_output_pending(&self, id: u64) -> bool {
self.connections
.get(&id)
.map(|c| !c.output_buffer.is_empty())
.unwrap_or(false)
}
pub fn url_retrieve_synchronously(&mut self, url: &str) -> Result<String, String> {
let (host, port, path) = parse_http_url(url)?;
let addr_str = format!("{}:{}", host, port);
let addrs: Vec<_> = addr_str
.to_socket_addrs()
.map_err(|e| format!("DNS resolution failed for {}: {}", host, e))?
.collect();
if addrs.is_empty() {
return Err(format!("No addresses found for {}", host));
}
let mut stream = TcpStream::connect_timeout(&addrs[0], Duration::from_secs(30))
.map_err(|e| format!("Connection to {}:{} failed: {}", host, port, e))?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(30)));
let request = format!(
"GET {} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n\r\n",
path, host,
);
stream
.write_all(request.as_bytes())
.map_err(|e| format!("Write error: {}", e))?;
let mut response = Vec::new();
stream
.read_to_end(&mut response)
.map_err(|e| format!("Read error: {}", e))?;
let response_str = String::from_utf8_lossy(&response).into_owned();
if let Some(pos) = response_str.find("\r\n\r\n") {
Ok(response_str[pos + 4..].to_string())
} else {
Ok(response_str)
}
}
}
fn parse_http_url(url: &str) -> Result<(String, u16, String), String> {
let rest = if let Some(stripped) = url.strip_prefix("http://") {
stripped
} else if let Some(stripped) = url.strip_prefix("https://") {
stripped
} else {
return Err(format!("Unsupported URL scheme in: {}", url));
};
let default_port: u16 = if url.starts_with("https://") { 443 } else { 80 };
let (hostport, path) = match rest.find('/') {
Some(idx) => (&rest[..idx], &rest[idx..]),
None => (rest, "/"),
};
let (host, port) = if let Some(colon_idx) = hostport.rfind(':') {
let port_str = &hostport[colon_idx + 1..];
let port: u16 = port_str
.parse()
.map_err(|_| format!("Invalid port in URL: {}", port_str))?;
(&hostport[..colon_idx], port)
} else {
(hostport, default_port)
};
if host.is_empty() {
return Err("Empty host in URL".to_string());
}
Ok((host.to_string(), port, path.to_string()))
}
#[cfg(test)]
fn expect_args(name: &str, args: &[Value], n: usize) -> Result<(), Flow> {
if args.len() != n {
Err(signal(
"wrong-number-of-arguments",
vec![Value::symbol(name), Value::fixnum(args.len() as i64)],
))
} else {
Ok(())
}
}
#[cfg(test)]
fn expect_min_args(name: &str, args: &[Value], min: usize) -> Result<(), Flow> {
if args.len() < min {
Err(signal(
"wrong-number-of-arguments",
vec![Value::symbol(name), Value::fixnum(args.len() as i64)],
))
} else {
Ok(())
}
}
#[cfg(test)]
fn expect_string(value: &Value) -> Result<String, Flow> {
match value.kind() {
ValueKind::String => Ok(value
.as_runtime_string_owned()
.expect("ValueKind::String must carry LispString payload")),
ValueKind::Symbol(id) => Ok(crate::emacs_core::intern::resolve_sym(id).to_owned()),
ValueKind::Nil => Ok("nil".to_string()),
ValueKind::T => Ok("t".to_string()),
other => Err(signal(
"wrong-type-argument",
vec![Value::symbol("stringp"), *value],
)),
}
}
#[cfg(test)]
fn expect_int(value: &Value) -> Result<i64, Flow> {
match value.kind() {
ValueKind::Fixnum(n) => Ok(n),
other => Err(signal(
"wrong-type-argument",
vec![Value::symbol("integerp"), *value],
)),
}
}
#[cfg(test)]
#[path = "network_test.rs"]
mod tests;