siphon_server/
state.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3
4use dashmap::DashMap;
5use parking_lot::RwLock;
6use tokio::sync::{mpsc, oneshot};
7
8/// Data for an HTTP response from a tunnel client
9#[derive(Debug)]
10pub struct HttpResponseData {
11    pub status: u16,
12    pub headers: Vec<(String, String)>,
13    pub body: Vec<u8>,
14}
15
16/// Shared registry for pending HTTP responses
17/// Maps stream_id -> response sender channel
18pub type ResponseRegistry = Arc<DashMap<u64, oneshot::Sender<HttpResponseData>>>;
19
20/// Create a new response registry
21pub fn new_response_registry() -> ResponseRegistry {
22    Arc::new(DashMap::new())
23}
24
25/// Handle to a TCP connection's write half and associated data
26pub struct TcpConnectionHandle {
27    pub writer: mpsc::Sender<Vec<u8>>,
28    #[allow(dead_code)]
29    pub subdomain: String,
30}
31
32/// Shared registry for TCP connections
33/// Maps stream_id -> TCP connection handle
34pub type TcpConnectionRegistry = Arc<DashMap<u64, TcpConnectionHandle>>;
35
36/// Create a new TCP connection registry
37pub fn new_tcp_connection_registry() -> TcpConnectionRegistry {
38    Arc::new(DashMap::new())
39}
40
41/// Port allocator for TCP tunnels
42pub struct PortAllocator {
43    start: u16,
44    end: u16,
45    allocated: RwLock<std::collections::HashSet<u16>>,
46}
47
48impl PortAllocator {
49    pub fn new(start: u16, end: u16) -> Arc<Self> {
50        Arc::new(Self {
51            start,
52            end,
53            allocated: RwLock::new(std::collections::HashSet::new()),
54        })
55    }
56
57    /// Allocate the next available port
58    pub fn allocate(&self) -> Option<u16> {
59        let mut allocated = self.allocated.write();
60        for port in self.start..=self.end {
61            if !allocated.contains(&port) {
62                allocated.insert(port);
63                return Some(port);
64            }
65        }
66        None
67    }
68
69    /// Release a port back to the pool
70    pub fn release(&self, port: u16) {
71        let mut allocated = self.allocated.write();
72        allocated.remove(&port);
73    }
74
75    /// Check if a port is allocated
76    #[allow(dead_code)]
77    pub fn is_allocated(&self, port: u16) -> bool {
78        self.allocated.read().contains(&port)
79    }
80}
81
82/// Global stream ID counter shared across all planes
83pub struct StreamIdGenerator {
84    counter: AtomicU64,
85}
86
87impl StreamIdGenerator {
88    pub fn new() -> Arc<Self> {
89        Arc::new(Self {
90            counter: AtomicU64::new(1),
91        })
92    }
93
94    pub fn next(&self) -> u64 {
95        self.counter.fetch_add(1, Ordering::Relaxed)
96    }
97}
98
99impl Default for StreamIdGenerator {
100    fn default() -> Self {
101        Self {
102            counter: AtomicU64::new(1),
103        }
104    }
105}