refractium/core/mod.rs
1/// Balancer module for load balancing logic.
2pub mod balancer;
3/// Builder module for the engine.
4mod builder;
5/// Health module for backend monitoring.
6pub mod health;
7/// Proxy module for TCP tunneling.
8pub mod proxy;
9/// Router module for protocol-based routing.
10pub mod router;
11/// TCP server implementation.
12pub mod tcp;
13/// Types used across the core module.
14pub mod types;
15/// UDP server implementation.
16pub mod udp;
17
18pub use builder::RefractiumBuilder;
19
20use crate::core::types::ProtocolRoute;
21use crate::errors::Result;
22use router::Router;
23use std::collections::HashMap;
24use std::net::SocketAddr;
25use std::sync::Arc;
26use tcp::TcpServer;
27use tokio_util::sync::CancellationToken;
28use udp::UdpServer;
29
30use self::health::HealthMonitor;
31
32/// The main engine for the Refractium proxy.
33///
34/// `Refractium` manages the lifecycle of TCP and UDP servers, protocol identification,
35/// and backend health monitoring. It is designed to be highly concurrent and supports
36/// atomic routing table updates via `reload_routes`.
37///
38/// # Examples
39///
40/// ```rust,no_run
41/// use refractium::{Refractium, Http, types::{ProtocolRoute, ForwardTarget}};
42/// use std::sync::Arc;
43///
44/// #[tokio::main]
45/// async fn main() -> anyhow::Result<()> {
46/// let routes = vec![ProtocolRoute {
47/// protocol: Arc::new(Http),
48/// sni: None,
49/// forward_to: ForwardTarget::Single("127.0.0.1:8080".to_string()),
50/// }];
51///
52/// let refractium = Refractium::builder()
53/// .routes(routes, Vec::new())
54/// .build()?;
55///
56/// refractium.run_tcp("0.0.0.0:80".parse()?).await?;
57/// Ok(())
58/// }
59/// ```
60pub struct Refractium {
61 pub(crate) router_tcp: Arc<Router>,
62 pub(crate) router_udp: Arc<Router>,
63 pub(crate) health: Arc<HealthMonitor>,
64 pub(crate) peek_buffer_size: usize,
65 pub(crate) peek_timeout_ms: u64,
66 pub(crate) max_connections: usize,
67 pub(crate) max_connections_per_ip: usize,
68 pub(crate) cancel_token: CancellationToken,
69}
70
71impl Refractium {
72 /// Returns a new [`RefractiumBuilder`] with default settings.
73 #[must_use]
74 pub const fn builder() -> RefractiumBuilder {
75 RefractiumBuilder::new()
76 }
77
78 /// Atomically reloads the routing table for all active servers.
79 ///
80 /// This method updates the internal load balancers and starts monitoring any new
81 /// backend addresses. Active connections are not dropped during the reload.
82 pub async fn reload_routes(&self, tcp: Vec<ProtocolRoute>, udp: Vec<ProtocolRoute>) {
83 let mut targets = tcp
84 .iter()
85 .flat_map(|r| r.forward_to.to_vec())
86 .collect::<Vec<_>>();
87 targets.extend(udp.iter().flat_map(|r| r.forward_to.to_vec()));
88
89 self.router_tcp
90 .update_balancer(tcp, Arc::clone(&self.health))
91 .await;
92 self.router_udp
93 .update_balancer(udp, Arc::clone(&self.health))
94 .await;
95
96 self.health.start_monitoring(targets);
97 }
98
99 /// Returns the [`CancellationToken`] used by the engine.
100 ///
101 /// This can be used to trigger a graceful shutdown from external logic.
102 #[must_use]
103 pub fn cancel_token(&self) -> CancellationToken {
104 self.cancel_token.clone()
105 }
106
107 /// Starts the TCP server on the provided address.
108 ///
109 /// This method will block until the server is shut down or an unrecoverable error occurs.
110 ///
111 /// # Errors
112 ///
113 /// Returns a [`crate::errors::RefractiumError::BindError`] if the address is already in use
114 /// or other IO errors occur during startup.
115 pub async fn run_tcp(&self, addr: SocketAddr) -> Result<()> {
116 TcpServer::new(
117 addr,
118 Arc::clone(&self.router_tcp),
119 Arc::clone(&self.health),
120 self.peek_buffer_size,
121 self.peek_timeout_ms,
122 self.max_connections,
123 self.max_connections_per_ip,
124 self.cancel_token.clone(),
125 )
126 .start()
127 .await
128 }
129
130 /// Starts the UDP server on the provided address.
131 ///
132 /// # Errors
133 ///
134 /// Returns a [`crate::errors::RefractiumError::BindError`] if the address is already in use.
135 pub async fn run_udp(&self, addr: SocketAddr) -> Result<()> {
136 UdpServer::new(
137 addr,
138 Arc::clone(&self.router_udp),
139 Arc::clone(&self.health),
140 self.cancel_token.clone(),
141 )
142 .start()
143 .await
144 }
145
146 /// Starts both TCP and UDP servers concurrently.
147 ///
148 /// # Errors
149 ///
150 /// Returns an error if either the TCP or UDP server fails to bind.
151 pub async fn run_both(&self, addr: SocketAddr) -> Result<()> {
152 tokio::try_join!(self.run_tcp(addr), self.run_udp(addr))?;
153 Ok(())
154 }
155
156 /// Prints a visual health report of all configured backends to stdout.
157 ///
158 /// This is mainly used for debugging or CLI reporting.
159 pub async fn report_health(&self) {
160 let tcp_status = self.router_tcp.get_health_status().await;
161 let udp_status = self.router_udp.get_health_status().await;
162 if !tcp_status.is_empty() {
163 println!("\n[TCP Backends]");
164 Self::print_status_map(tcp_status);
165 }
166 if !udp_status.is_empty() {
167 println!("\n[UDP Backends]");
168 Self::print_status_map(udp_status);
169 }
170 println!();
171 }
172
173 fn print_status_map(status: HashMap<String, Vec<(String, bool)>>) {
174 for (proto, backends) in status {
175 print!(" {proto} -> ");
176 for (idx, (addr, alive)) in backends.iter().enumerate() {
177 if idx > 0 {
178 print!(", ");
179 }
180 let s = if *alive {
181 "\x1b[32mUP\x1b[0m"
182 } else {
183 "\x1b[31mDOWN\x1b[0m"
184 };
185 print!("{addr} [{s}]");
186 }
187 println!();
188 }
189 }
190}