pdf_live_server/
lib.rs

1use anyhow::Result;
2use axum::{
3    extract::ws::{Message, WebSocket, WebSocketUpgrade},
4    http::header,
5    response::{Html, IntoResponse, Response},
6    routing::get,
7    Extension, Router,
8};
9use clap::Parser;
10use drop_this::DropResult;
11use notify::RecommendedWatcher;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent, Debouncer};
13use pdf_reading::PdfReader;
14use std::{
15    net::SocketAddr,
16    path::{Path, PathBuf},
17    time::{Duration, SystemTime},
18};
19use tokio::{
20    fs::{metadata, read},
21    net::TcpListener,
22    sync::watch,
23};
24use tokio_gen_server::prelude::*;
25use tower::ServiceBuilder;
26use tracing::*;
27
28mod pdf_reading;
29
30/// Serve a PDF file live and reload the browser on changes.
31#[derive(Parser, Debug)]
32struct Args {
33    /// Directory to watch for changes.
34    #[arg(long, short = 'd', default_value = "./")]
35    watch_dir: PathBuf,
36
37    /// PDF file to serve. I also check its modified time to decide if changes occur.
38    #[arg(long, short = 'f')]
39    served_pdf: PathBuf,
40
41    /// Address to bind the server.
42    #[arg(long, short = 's', default_value = "127.0.0.1:3000")]
43    socket_addr: SocketAddr,
44}
45
46type OptBytes = Option<Vec<u8>>;
47
48pub async fn run() -> Result<()> {
49    let args = Args::parse();
50    let (tx, rx) = watch::channel::<OptBytes>(None);
51
52    let pdf_reader = PdfReader {
53        served_pdf: args.served_pdf.clone(),
54        tx: tx.clone(),
55        current_modified_time: None,
56    };
57    let (actor_handle, actor_ref) = pdf_reader.spawn();
58    let _keep_debouncer_alive = start_watcher(args.watch_dir, actor_ref.clone())?;
59
60    let app = Router::new()
61        .route("/", get(serve_html))
62        .route("/main.mjs", get(serve_js))
63        .route("/__pdf_live_server_ws", get(ws_handler))
64        .route("/served.pdf", get(serve_pdf))
65        .layer(ServiceBuilder::new().layer(Extension(tx)))
66        .layer(ServiceBuilder::new().layer(Extension(rx)));
67
68    let listener = TcpListener::bind(args.socket_addr).await?;
69    info!("Starting to listen on {}.", args.socket_addr);
70    axum::serve(listener, app).await?;
71    actor_ref.cancel();
72    actor_handle.await?.exit_result?;
73    Ok(())
74}
75
76async fn serve_html() -> Html<&'static str> {
77    Html(include_str!("index.html"))
78}
79
80async fn serve_js() -> impl IntoResponse {
81    (
82        [(header::CONTENT_TYPE, "application/javascript")],
83        include_str!("main.mjs"),
84    )
85}
86async fn serve_pdf(Extension(mut rx): Extension<watch::Receiver<OptBytes>>) -> impl IntoResponse {
87    let maybe_bytes = rx.borrow().clone();
88    let pdf_bytes = match maybe_bytes {
89        pdf_bytes @ Some(_) => pdf_bytes,
90        None => {
91            warn!("No PDF bytes to serve for the route yet. Waiting.");
92            await_pdf_bytes(&mut rx).await
93        }
94    }
95    .unwrap_or(b"We must be shutting down.".into());
96    ([(header::CONTENT_TYPE, "application/pdf")], pdf_bytes)
97}
98
99async fn await_pdf_bytes(rx: &mut watch::Receiver<OptBytes>) -> Option<Vec<u8>> {
100    match rx.changed().await {
101        Ok(_) => rx.borrow().clone(),
102        _ => None,
103    }
104}
105
106async fn ws_handler(
107    ws: WebSocketUpgrade,
108    Extension(tx): Extension<watch::Sender<OptBytes>>,
109) -> Response {
110    ws.on_upgrade(move |socket| handle_socket(socket, tx.subscribe()))
111}
112
113async fn handle_socket(mut socket: WebSocket, mut rx: watch::Receiver<OptBytes>) {
114    debug!("Connected via WebSocket.");
115    while rx.changed().await.is_ok() {
116        let msg = Message::Binary(rx.borrow().clone().expect("Updates are all `Some`."));
117        if socket.send(msg).await.is_err() {
118            break;
119        }
120        debug!("Sent message via WebSocket.");
121    }
122    info!("Closing WebSocket connection.");
123}
124
125pub const MS100: Duration = Duration::from_millis(100);
126
127fn start_watcher(
128    watch_dir: PathBuf,
129    actor_ref: ActorRef<PdfReader>,
130) -> notify::Result<Debouncer<RecommendedWatcher>> {
131    let event_handler = move |event: notify::Result<Vec<DebouncedEvent>>| match event {
132        Err(err) => error!(?err, "file watcher"),
133        _ => actor_ref.blocking_cast(()).drop_result(),
134    };
135
136    let mut debouncer = new_debouncer(MS100, event_handler)?;
137    debouncer
138        .watcher()
139        .watch(&watch_dir, notify::RecursiveMode::Recursive)?;
140    Ok(debouncer)
141}