use dashmap::DashMap;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::{Result, ServiceProtocol, TunnelError};
#[derive(Debug, Clone)]
pub enum ControlMessage {
Connect {
service_id: Uuid,
connection_id: Uuid,
client_addr: SocketAddr,
},
Heartbeat {
timestamp: u64,
},
Disconnect {
reason: String,
},
}
#[derive(Debug)]
pub struct TunnelService {
pub id: Uuid,
pub name: String,
pub protocol: ServiceProtocol,
pub local_port: u16,
pub remote_port: u16,
pub assigned_port: Option<u16>,
pub active_connections: AtomicUsize,
}
impl TunnelService {
#[must_use]
pub fn new(name: String, protocol: ServiceProtocol, local_port: u16, remote_port: u16) -> Self {
Self {
id: Uuid::new_v4(),
name,
protocol,
local_port,
remote_port,
assigned_port: None,
active_connections: AtomicUsize::new(0),
}
}
#[must_use]
pub fn connection_count(&self) -> usize {
self.active_connections.load(Ordering::Relaxed)
}
pub fn increment_connections(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_connections(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
}
impl Clone for TunnelService {
fn clone(&self) -> Self {
Self {
id: self.id,
name: self.name.clone(),
protocol: self.protocol,
local_port: self.local_port,
remote_port: self.remote_port,
assigned_port: self.assigned_port,
active_connections: AtomicUsize::new(self.active_connections.load(Ordering::Relaxed)),
}
}
}
#[derive(Debug)]
pub struct Tunnel {
pub id: Uuid,
pub token_hash: String,
pub name: Option<String>,
pub services: DashMap<Uuid, TunnelService>,
pub control_tx: mpsc::Sender<ControlMessage>,
pub connected_at: Instant,
pub last_activity: Mutex<Instant>,
pub client_addr: Option<SocketAddr>,
}
impl Tunnel {
#[must_use]
pub fn new(
token_hash: String,
name: Option<String>,
control_tx: mpsc::Sender<ControlMessage>,
client_addr: Option<SocketAddr>,
) -> Self {
let now = Instant::now();
Self {
id: Uuid::new_v4(),
token_hash,
name,
services: DashMap::new(),
control_tx,
connected_at: now,
last_activity: Mutex::new(now),
client_addr,
}
}
pub fn touch(&self) {
*self.last_activity.lock() = Instant::now();
}
#[must_use]
pub fn last_activity(&self) -> Instant {
*self.last_activity.lock()
}
#[must_use]
pub fn service_count(&self) -> usize {
self.services.len()
}
#[must_use]
pub fn is_idle_for(&self, duration: Duration) -> bool {
self.last_activity().elapsed() > duration
}
pub async fn send_control(&self, message: ControlMessage) -> Result<()> {
self.control_tx
.send(message)
.await
.map_err(|_| TunnelError::registry("control channel closed"))
}
pub fn try_send_control(&self, message: ControlMessage) -> Result<()> {
self.control_tx
.try_send(message)
.map_err(|e| TunnelError::registry(format!("failed to send control message: {e}")))
}
}
#[derive(Debug)]
struct PortPool {
range_start: u16,
range_end: u16,
used_ports: HashSet<u16>,
}
impl PortPool {
fn new(start: u16, end: u16) -> Self {
Self {
range_start: start,
range_end: end,
used_ports: HashSet::new(),
}
}
fn allocate(&mut self) -> Option<u16> {
for port in self.range_start..=self.range_end {
if !self.used_ports.contains(&port) {
self.used_ports.insert(port);
return Some(port);
}
}
None
}
fn allocate_specific(&mut self, port: u16) -> bool {
if port < self.range_start || port > self.range_end {
return false;
}
if self.used_ports.contains(&port) {
return false;
}
self.used_ports.insert(port);
true
}
fn release(&mut self, port: u16) {
self.used_ports.remove(&port);
}
fn is_available(&self, port: u16) -> bool {
port >= self.range_start && port <= self.range_end && !self.used_ports.contains(&port)
}
fn available_count(&self) -> usize {
let total = (self.range_end - self.range_start + 1) as usize;
total.saturating_sub(self.used_ports.len())
}
}
#[derive(Debug, Clone)]
pub struct TunnelInfo {
pub id: Uuid,
pub name: Option<String>,
pub connected_at: Instant,
pub last_activity: Instant,
pub service_count: usize,
pub client_addr: Option<SocketAddr>,
}
impl From<&Tunnel> for TunnelInfo {
fn from(tunnel: &Tunnel) -> Self {
Self {
id: tunnel.id,
name: tunnel.name.clone(),
connected_at: tunnel.connected_at,
last_activity: tunnel.last_activity(),
service_count: tunnel.service_count(),
client_addr: tunnel.client_addr,
}
}
}
#[derive(Debug)]
pub struct TunnelRegistry {
tunnels: DashMap<Uuid, Arc<Tunnel>>,
tunnels_by_token: DashMap<String, Uuid>,
services_by_port: DashMap<u16, (Uuid, Uuid)>,
port_pool: Mutex<PortPool>,
}
impl TunnelRegistry {
#[must_use]
pub fn new(port_range: (u16, u16)) -> Self {
Self {
tunnels: DashMap::new(),
tunnels_by_token: DashMap::new(),
services_by_port: DashMap::new(),
port_pool: Mutex::new(PortPool::new(port_range.0, port_range.1)),
}
}
pub fn register_tunnel(
&self,
token_hash: String,
name: Option<String>,
control_tx: mpsc::Sender<ControlMessage>,
client_addr: Option<SocketAddr>,
) -> Result<Arc<Tunnel>> {
if self.tunnels_by_token.contains_key(&token_hash) {
return Err(TunnelError::registry("token already registered"));
}
let tunnel = Arc::new(Tunnel::new(
token_hash.clone(),
name,
control_tx,
client_addr,
));
self.tunnels_by_token.insert(token_hash, tunnel.id);
self.tunnels.insert(tunnel.id, Arc::clone(&tunnel));
tracing::info!(
tunnel_id = %tunnel.id,
client_addr = ?client_addr,
"tunnel registered"
);
Ok(tunnel)
}
pub fn unregister_tunnel(&self, tunnel_id: Uuid) -> Option<Arc<Tunnel>> {
let tunnel = self.tunnels.remove(&tunnel_id).map(|(_, t)| t)?;
self.tunnels_by_token.remove(&tunnel.token_hash);
let mut port_pool = self.port_pool.lock();
for entry in &tunnel.services {
let service = entry.value();
if let Some(port) = service.assigned_port {
self.services_by_port.remove(&port);
port_pool.release(port);
}
}
tracing::info!(
tunnel_id = %tunnel.id,
services_removed = tunnel.services.len(),
"tunnel unregistered"
);
Some(tunnel)
}
#[must_use]
pub fn get_tunnel(&self, tunnel_id: Uuid) -> Option<Arc<Tunnel>> {
self.tunnels.get(&tunnel_id).map(|entry| Arc::clone(&entry))
}
#[must_use]
pub fn get_tunnel_by_token(&self, token_hash: &str) -> Option<Arc<Tunnel>> {
let tunnel_id = self.tunnels_by_token.get(token_hash)?;
self.get_tunnel(*tunnel_id)
}
#[must_use]
pub fn token_exists(&self, token_hash: &str) -> bool {
self.tunnels_by_token.contains_key(token_hash)
}
pub fn add_service(
&self,
tunnel_id: Uuid,
name: &str,
protocol: ServiceProtocol,
local_port: u16,
remote_port: u16,
) -> Result<TunnelService> {
let tunnel = self
.get_tunnel(tunnel_id)
.ok_or_else(|| TunnelError::registry("tunnel not found"))?;
let mut service = TunnelService::new(name.to_string(), protocol, local_port, remote_port);
let assigned_port = {
let mut port_pool = self.port_pool.lock();
if remote_port == 0 {
port_pool
.allocate()
.ok_or_else(|| TunnelError::registry("no ports available"))?
} else {
if !port_pool.allocate_specific(remote_port) {
return Err(TunnelError::registry(format!(
"port {remote_port} is not available"
)));
}
remote_port
}
};
service.assigned_port = Some(assigned_port);
self.services_by_port
.insert(assigned_port, (tunnel_id, service.id));
tunnel.services.insert(service.id, service.clone());
tracing::info!(
tunnel_id = %tunnel_id,
service_id = %service.id,
service_name = %name,
assigned_port = assigned_port,
"service registered"
);
Ok(service)
}
pub fn remove_service(&self, tunnel_id: Uuid, service_id: Uuid) -> Result<TunnelService> {
let tunnel = self
.get_tunnel(tunnel_id)
.ok_or_else(|| TunnelError::registry("tunnel not found"))?;
let (_, service) = tunnel
.services
.remove(&service_id)
.ok_or_else(|| TunnelError::registry("service not found"))?;
if let Some(port) = service.assigned_port {
self.services_by_port.remove(&port);
self.port_pool.lock().release(port);
}
tracing::info!(
tunnel_id = %tunnel_id,
service_id = %service_id,
service_name = %service.name,
"service unregistered"
);
Ok(service)
}
#[must_use]
pub fn get_service_by_port(&self, port: u16) -> Option<(Arc<Tunnel>, TunnelService)> {
let (tunnel_id, service_id) = *self.services_by_port.get(&port)?;
let tunnel = self.get_tunnel(tunnel_id)?;
let service = tunnel.services.get(&service_id)?.clone();
Some((tunnel, service))
}
#[must_use]
pub fn list_tunnels(&self) -> Vec<TunnelInfo> {
self.tunnels
.iter()
.map(|entry| TunnelInfo::from(entry.value().as_ref()))
.collect()
}
#[must_use]
pub fn tunnel_count(&self) -> usize {
self.tunnels.len()
}
#[must_use]
pub fn service_count(&self) -> usize {
self.tunnels
.iter()
.map(|entry| entry.value().service_count())
.sum()
}
pub fn touch_tunnel(&self, tunnel_id: Uuid) {
if let Some(tunnel) = self.get_tunnel(tunnel_id) {
tunnel.touch();
}
}
pub fn cleanup_stale(&self, max_idle: Duration) -> Vec<Uuid> {
let stale_ids: Vec<Uuid> = self
.tunnels
.iter()
.filter(|entry| entry.value().is_idle_for(max_idle))
.map(|entry| *entry.key())
.collect();
for tunnel_id in &stale_ids {
if let Some(tunnel) = self.unregister_tunnel(*tunnel_id) {
let _ = tunnel.try_send_control(ControlMessage::Disconnect {
reason: "idle timeout".to_string(),
});
tracing::info!(
tunnel_id = %tunnel_id,
"removed stale tunnel"
);
}
}
stale_ids
}
#[must_use]
pub fn available_ports(&self) -> usize {
self.port_pool.lock().available_count()
}
#[must_use]
pub fn is_port_available(&self, port: u16) -> bool {
self.port_pool.lock().is_available(port)
}
}
impl Default for TunnelRegistry {
fn default() -> Self {
Self::new((30000, 31000))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn create_test_registry() -> TunnelRegistry {
TunnelRegistry::new((30000, 30100))
}
fn create_test_tunnel(registry: &TunnelRegistry, token: &str) -> Arc<Tunnel> {
let (tx, _rx) = mpsc::channel(16);
registry
.register_tunnel(
token.to_string(),
Some("test-tunnel".to_string()),
tx,
Some("127.0.0.1:12345".parse().unwrap()),
)
.unwrap()
}
#[test]
fn test_register_tunnel() {
let registry = create_test_registry();
let (tx, _rx) = mpsc::channel(16);
let tunnel = registry
.register_tunnel(
"token123".to_string(),
Some("my-tunnel".to_string()),
tx,
Some("192.168.1.100:54321".parse().unwrap()),
)
.unwrap();
assert_eq!(tunnel.token_hash, "token123");
assert_eq!(tunnel.name, Some("my-tunnel".to_string()));
assert_eq!(
tunnel.client_addr,
Some("192.168.1.100:54321".parse().unwrap())
);
assert_eq!(registry.tunnel_count(), 1);
}
#[test]
fn test_register_duplicate_token() {
let registry = create_test_registry();
let (tx1, _rx1) = mpsc::channel(16);
let (tx2, _rx2) = mpsc::channel(16);
registry
.register_tunnel("same-token".to_string(), None, tx1, None)
.unwrap();
let result = registry.register_tunnel("same-token".to_string(), None, tx2, None);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("already registered"));
}
#[test]
fn test_unregister_tunnel() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let tunnel_id = tunnel.id;
registry
.add_service(tunnel_id, "ssh", ServiceProtocol::Tcp, 22, 0)
.unwrap();
assert_eq!(registry.tunnel_count(), 1);
assert_eq!(registry.service_count(), 1);
let removed = registry.unregister_tunnel(tunnel_id);
assert!(removed.is_some());
assert_eq!(registry.tunnel_count(), 0);
assert_eq!(registry.service_count(), 0);
assert!(registry.get_tunnel(tunnel_id).is_none());
assert!(registry.get_tunnel_by_token("token123").is_none());
}
#[test]
fn test_get_tunnel_by_id() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let found = registry.get_tunnel(tunnel.id);
assert!(found.is_some());
assert_eq!(found.unwrap().id, tunnel.id);
assert!(registry.get_tunnel(Uuid::new_v4()).is_none());
}
#[test]
fn test_get_tunnel_by_token() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "my-secret-token");
let found = registry.get_tunnel_by_token("my-secret-token");
assert!(found.is_some());
assert_eq!(found.unwrap().id, tunnel.id);
assert!(registry.get_tunnel_by_token("wrong-token").is_none());
}
#[test]
fn test_token_exists() {
let registry = create_test_registry();
create_test_tunnel(®istry, "token123");
assert!(registry.token_exists("token123"));
assert!(!registry.token_exists("other-token"));
}
#[test]
fn test_add_service_auto_assign() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let service = registry
.add_service(
tunnel.id,
"ssh",
ServiceProtocol::Tcp,
22,
0, )
.unwrap();
assert_eq!(service.name, "ssh");
assert_eq!(service.protocol, ServiceProtocol::Tcp);
assert_eq!(service.local_port, 22);
assert_eq!(service.remote_port, 0);
assert_eq!(service.assigned_port, Some(30000)); }
#[test]
fn test_add_service_specific_port() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let service = registry
.add_service(
tunnel.id,
"ssh",
ServiceProtocol::Tcp,
22,
30050, )
.unwrap();
assert_eq!(service.assigned_port, Some(30050));
}
#[test]
fn test_add_service_port_conflict() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
registry
.add_service(tunnel.id, "ssh", ServiceProtocol::Tcp, 22, 30050)
.unwrap();
let result = registry.add_service(tunnel.id, "ssh2", ServiceProtocol::Tcp, 2222, 30050);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not available"));
}
#[test]
fn test_add_service_tunnel_not_found() {
let registry = create_test_registry();
let result = registry.add_service(
Uuid::new_v4(), "ssh",
ServiceProtocol::Tcp,
22,
0,
);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[test]
fn test_remove_service() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let service = registry
.add_service(tunnel.id, "ssh", ServiceProtocol::Tcp, 22, 30050)
.unwrap();
assert_eq!(registry.service_count(), 1);
assert!(!registry.is_port_available(30050));
let removed = registry.remove_service(tunnel.id, service.id).unwrap();
assert_eq!(removed.name, "ssh");
assert_eq!(registry.service_count(), 0);
assert!(registry.is_port_available(30050)); }
#[test]
fn test_remove_service_not_found() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let result = registry.remove_service(tunnel.id, Uuid::new_v4());
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[test]
fn test_get_service_by_port() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let service = registry
.add_service(tunnel.id, "ssh", ServiceProtocol::Tcp, 22, 30050)
.unwrap();
let (found_tunnel, found_service) = registry.get_service_by_port(30050).unwrap();
assert_eq!(found_tunnel.id, tunnel.id);
assert_eq!(found_service.id, service.id);
assert!(registry.get_service_by_port(30051).is_none());
}
#[test]
fn test_multiple_tunnels_multiple_services() {
let registry = create_test_registry();
let tunnel1 = create_test_tunnel(®istry, "token1");
let tunnel2 = create_test_tunnel(®istry, "token2");
registry
.add_service(tunnel1.id, "ssh", ServiceProtocol::Tcp, 22, 0)
.unwrap();
registry
.add_service(tunnel1.id, "postgres", ServiceProtocol::Tcp, 5432, 0)
.unwrap();
registry
.add_service(tunnel2.id, "game", ServiceProtocol::Udp, 27015, 0)
.unwrap();
assert_eq!(registry.tunnel_count(), 2);
assert_eq!(registry.service_count(), 3);
let t1 = registry.get_tunnel(tunnel1.id).unwrap();
let t2 = registry.get_tunnel(tunnel2.id).unwrap();
assert_eq!(t1.service_count(), 2);
assert_eq!(t2.service_count(), 1);
}
#[test]
fn test_list_tunnels() {
let registry = create_test_registry();
create_test_tunnel(®istry, "token1");
create_test_tunnel(®istry, "token2");
create_test_tunnel(®istry, "token3");
let tunnels = registry.list_tunnels();
assert_eq!(tunnels.len(), 3);
for info in &tunnels {
assert!(info.name.is_some());
assert_eq!(info.service_count, 0);
}
}
#[test]
fn test_touch_and_activity_tracking() {
let registry = create_test_registry();
let tunnel = create_test_tunnel(®istry, "token123");
let initial_activity = tunnel.last_activity();
std::thread::sleep(Duration::from_millis(10));
registry.touch_tunnel(tunnel.id);
let updated_activity = tunnel.last_activity();
assert!(updated_activity > initial_activity);
}
#[test]
fn test_cleanup_stale() {
let registry = create_test_registry();
let tunnel1 = create_test_tunnel(®istry, "token1");
let tunnel2 = create_test_tunnel(®istry, "token2");
registry.touch_tunnel(tunnel2.id);
std::thread::sleep(Duration::from_millis(50));
let stale = registry.cleanup_stale(Duration::from_secs(3600)); assert!(stale.is_empty());
assert!(registry.get_tunnel(tunnel1.id).is_some());
assert!(registry.get_tunnel(tunnel2.id).is_some());
}
#[test]
fn test_port_pool_allocation() {
let registry = TunnelRegistry::new((30000, 30002)); let tunnel = create_test_tunnel(®istry, "token123");
let s1 = registry
.add_service(tunnel.id, "s1", ServiceProtocol::Tcp, 1, 0)
.unwrap();
let s2 = registry
.add_service(tunnel.id, "s2", ServiceProtocol::Tcp, 2, 0)
.unwrap();
let s3 = registry
.add_service(tunnel.id, "s3", ServiceProtocol::Tcp, 3, 0)
.unwrap();
assert_eq!(s1.assigned_port, Some(30000));
assert_eq!(s2.assigned_port, Some(30001));
assert_eq!(s3.assigned_port, Some(30002));
let result = registry.add_service(tunnel.id, "s4", ServiceProtocol::Tcp, 4, 0);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("no ports available"));
registry.remove_service(tunnel.id, s2.id).unwrap();
let s4 = registry
.add_service(tunnel.id, "s4", ServiceProtocol::Tcp, 4, 0)
.unwrap();
assert_eq!(s4.assigned_port, Some(30001)); }
#[test]
fn test_port_pool_out_of_range() {
let registry = TunnelRegistry::new((30000, 30010));
let tunnel = create_test_tunnel(®istry, "token123");
let result = registry.add_service(tunnel.id, "s1", ServiceProtocol::Tcp, 1, 25000);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not available"));
}
#[test]
fn test_available_ports() {
let registry = TunnelRegistry::new((30000, 30009)); let tunnel = create_test_tunnel(®istry, "token123");
assert_eq!(registry.available_ports(), 10);
registry
.add_service(tunnel.id, "s1", ServiceProtocol::Tcp, 1, 0)
.unwrap();
assert_eq!(registry.available_ports(), 9);
registry
.add_service(tunnel.id, "s2", ServiceProtocol::Tcp, 2, 30005)
.unwrap();
assert_eq!(registry.available_ports(), 8);
}
#[test]
fn test_is_port_available() {
let registry = TunnelRegistry::new((30000, 30010));
let tunnel = create_test_tunnel(®istry, "token123");
assert!(registry.is_port_available(30005));
assert!(!registry.is_port_available(29999)); assert!(!registry.is_port_available(30011));
registry
.add_service(tunnel.id, "s1", ServiceProtocol::Tcp, 1, 30005)
.unwrap();
assert!(!registry.is_port_available(30005)); }
#[test]
fn test_tunnel_service_connection_count() {
let service = TunnelService::new("test".to_string(), ServiceProtocol::Tcp, 22, 2222);
assert_eq!(service.connection_count(), 0);
service.increment_connections();
service.increment_connections();
assert_eq!(service.connection_count(), 2);
service.decrement_connections();
assert_eq!(service.connection_count(), 1);
}
#[test]
fn test_tunnel_service_clone() {
let service = TunnelService::new("test".to_string(), ServiceProtocol::Tcp, 22, 2222);
service.increment_connections();
service.increment_connections();
let cloned = service.clone();
assert_eq!(cloned.id, service.id);
assert_eq!(cloned.name, service.name);
assert_eq!(cloned.connection_count(), 2);
}
#[test]
fn test_tunnel_is_idle_for() {
let (tx, _rx) = mpsc::channel(16);
let tunnel = Tunnel::new("token".to_string(), None, tx, None);
assert!(!tunnel.is_idle_for(Duration::from_secs(1)));
std::thread::sleep(Duration::from_millis(50));
assert!(tunnel.is_idle_for(Duration::from_millis(10)));
tunnel.touch();
assert!(!tunnel.is_idle_for(Duration::from_millis(10)));
}
#[test]
fn test_tunnel_info_from() {
let (tx, _rx) = mpsc::channel(16);
let tunnel = Tunnel::new(
"token".to_string(),
Some("my-tunnel".to_string()),
tx,
Some("192.168.1.1:1234".parse().unwrap()),
);
let info = TunnelInfo::from(&tunnel);
assert_eq!(info.id, tunnel.id);
assert_eq!(info.name, Some("my-tunnel".to_string()));
assert_eq!(info.service_count, 0);
assert_eq!(info.client_addr, Some("192.168.1.1:1234".parse().unwrap()));
}
#[test]
fn test_default_registry() {
let registry = TunnelRegistry::default();
assert_eq!(registry.tunnel_count(), 0);
assert_eq!(registry.available_ports(), 1001);
}
#[tokio::test]
async fn test_tunnel_send_control() {
let (tx, mut rx) = mpsc::channel(16);
let tunnel = Tunnel::new("token".to_string(), None, tx, None);
tunnel
.send_control(ControlMessage::Heartbeat { timestamp: 12345 })
.await
.unwrap();
let msg = rx.recv().await.unwrap();
match msg {
ControlMessage::Heartbeat { timestamp } => assert_eq!(timestamp, 12345),
_ => panic!("unexpected message"),
}
}
#[test]
fn test_tunnel_try_send_control() {
let (tx, _rx) = mpsc::channel(16);
let tunnel = Tunnel::new("token".to_string(), None, tx, None);
let result = tunnel.try_send_control(ControlMessage::Heartbeat { timestamp: 12345 });
assert!(result.is_ok());
}
#[test]
fn test_tunnel_try_send_control_full_channel() {
let (tx, _rx) = mpsc::channel(1); let tunnel = Tunnel::new("token".to_string(), None, tx, None);
tunnel
.try_send_control(ControlMessage::Heartbeat { timestamp: 1 })
.unwrap();
let result = tunnel.try_send_control(ControlMessage::Heartbeat { timestamp: 2 });
assert!(result.is_err());
}
}