use crate::error::{Result, RuitlError};
use colored::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
const RELOAD_JS_TEMPLATE: &str = r#"(() => {
const es = new EventSource("__RUITL_RELOAD_URL__");
es.addEventListener("reload", () => window.location.reload());
es.addEventListener("ping", () => {}); // keep-alive noop
window.addEventListener("beforeunload", () => es.close());
})();
"#;
#[derive(Debug, Clone)]
pub struct DevOptions {
pub reload_port: u16,
pub verbose: bool,
}
impl Default for DevOptions {
fn default() -> Self {
Self {
reload_port: 35729,
verbose: false,
}
}
}
#[derive(Clone)]
struct ReloadBus {
tx: broadcast::Sender<()>,
}
impl ReloadBus {
fn new() -> Self {
let (tx, _) = broadcast::channel(16);
Self { tx }
}
fn subscribe(&self) -> broadcast::Receiver<()> {
self.tx.subscribe()
}
fn fire(&self) {
let _ = self.tx.send(());
}
}
pub async fn run_dev(src_dir: &Path, opts: DevOptions) -> Result<()> {
let bus = Arc::new(ReloadBus::new());
ruitl_compiler::compile_dir_sibling(src_dir)
.map_err(|e| RuitlError::generic(format!("Initial compile failed: {}", e)))?;
println!("{}", "✓ Initial compile OK".green());
#[cfg(feature = "dev")]
{
let src_owned = src_dir.to_path_buf();
let bus_for_watch = Arc::clone(&bus);
let verbose = opts.verbose;
tokio::task::spawn_blocking(move || {
if let Err(e) = run_watcher_blocking(&src_owned, bus_for_watch, verbose) {
eprintln!("{} watcher failed: {}", "error:".red(), e);
}
});
}
#[cfg(not(feature = "dev"))]
{
return Err(RuitlError::generic(
"`ruitl dev` requires the 'dev' feature. Rebuild with `cargo build --features dev`.",
));
}
let addr: SocketAddr = ([127, 0, 0, 1], opts.reload_port).into();
println!(
"{} reload server on http://{}",
"✓".green(),
addr.to_string().bright_blue()
);
println!(
" Script tag: {}",
format!(
"<script src=\"http://{}/ruitl/reload.js\"></script>",
addr
)
.bright_black()
);
println!(" Press Ctrl+C to stop.");
let bus_for_server = Arc::clone(&bus);
let make_svc = make_service_fn(move |_| {
let bus = Arc::clone(&bus_for_server);
let port = opts.reload_port;
async move {
let bus = bus.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let bus = bus.clone();
async move { handle_request(req, bus, port).await }
}))
}
});
Server::bind(&addr)
.serve(make_svc)
.await
.map_err(|e| RuitlError::generic(format!("Reload server error: {}", e)))?;
Ok(())
}
#[cfg(feature = "dev")]
fn run_watcher_blocking(
src_dir: &Path,
bus: Arc<ReloadBus>,
verbose: bool,
) -> Result<()> {
use hotwatch::{Event, Hotwatch};
use std::path::PathBuf;
let mut hotwatch = Hotwatch::new_with_custom_delay(Duration::from_millis(150))
.map_err(|e| RuitlError::generic(format!("Failed to start watcher: {}", e)))?;
let src_owned = src_dir.to_path_buf();
hotwatch
.watch(src_dir, move |event: Event| {
let changed: Option<&PathBuf> = match &event {
Event::Create(p)
| Event::Write(p)
| Event::Remove(p)
| Event::Rename(p, _) => Some(p),
_ => None,
};
let Some(path) = changed else { return };
if path.extension().map(|e| e != "ruitl").unwrap_or(true) {
return;
}
if verbose {
println!(
"{} change in {}",
"info:".bright_blue().bold(),
path.display()
);
}
match ruitl_compiler::compile_dir_sibling(&src_owned) {
Ok(_) => {
println!("{} recompiled, notifying browsers", "✓".green());
bus.fire();
}
Err(e) => {
eprintln!("{} recompile failed: {}", "error:".red(), e);
}
}
})
.map_err(|e| RuitlError::generic(format!("Failed to watch '{}': {}", src_dir.display(), e)))?;
loop {
std::thread::sleep(Duration::from_secs(60));
}
}
async fn handle_request(
req: Request<Body>,
bus: Arc<ReloadBus>,
port: u16,
) -> std::result::Result<Response<Body>, Infallible> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/ruitl/reload.js") => Ok(reload_js_response(port)),
(&Method::GET, "/ruitl/reload") => Ok(sse_response(bus.subscribe())),
_ => Ok(not_found()),
}
}
fn reload_js_response(port: u16) -> Response<Body> {
let body = RELOAD_JS_TEMPLATE
.replace(
"__RUITL_RELOAD_URL__",
&format!("http://127.0.0.1:{}/ruitl/reload", port),
);
Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.header("cache-control", "no-cache")
.header("access-control-allow-origin", "*")
.body(Body::from(body))
.unwrap()
}
fn sse_response(rx: broadcast::Receiver<()>) -> Response<Body> {
use futures::stream::StreamExt;
use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
let reloads = BroadcastStream::new(rx).filter_map(|item| async move {
match item {
Ok(_) => Some(hyper::body::Bytes::from(
"event: reload\ndata: \n\n".to_string(),
)),
Err(_) => None,
}
});
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let pings = IntervalStream::new(interval)
.map(|_| hyper::body::Bytes::from("event: ping\ndata: \n\n".to_string()));
let hello = futures::stream::once(async {
hyper::body::Bytes::from(":connected\n\n".to_string())
});
let merged = hello
.chain(futures::stream::select(reloads, pings))
.map(Ok::<_, Infallible>);
Response::builder()
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.header("access-control-allow-origin", "*")
.body(Body::wrap_stream(merged))
.unwrap()
}
fn not_found() -> Response<Body> {
let mut r = Response::new(Body::from("not found"));
*r.status_mut() = StatusCode::NOT_FOUND;
r
}
pub fn reload_script_tag(port: u16) -> String {
format!(
r#"<script src="http://127.0.0.1:{}/ruitl/reload.js"></script>"#,
port
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reload_bus_fires_to_subscribers() {
let bus = ReloadBus::new();
let mut rx = bus.subscribe();
bus.fire();
assert!(rx.try_recv().is_ok(), "bus tick must reach subscriber");
}
#[test]
fn reload_bus_no_receivers_no_panic() {
let bus = ReloadBus::new();
bus.fire(); }
#[test]
fn script_tag_embeds_port() {
let t = reload_script_tag(12345);
assert!(t.contains(":12345/"));
assert!(t.contains("<script src=\""));
}
}