1use anyhow::Result;
2use axum::{
3 extract::ws::{Message, WebSocket, WebSocketUpgrade},
4 http::header,
5 response::{Html, IntoResponse, Response},
6 routing::get,
7 Extension, Router,
8};
9use clap::Parser;
10use drop_this::DropResult;
11use notify::RecommendedWatcher;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent, Debouncer};
13use pdf_reading::PdfReader;
14use std::{
15 net::SocketAddr,
16 path::{Path, PathBuf},
17 time::{Duration, SystemTime},
18};
19use tokio::{
20 fs::{metadata, read},
21 net::TcpListener,
22 sync::watch,
23};
24use tokio_gen_server::prelude::*;
25use tower::ServiceBuilder;
26use tracing::*;
27
28mod pdf_reading;
29
30#[derive(Parser, Debug)]
32struct Args {
33 #[arg(long, short = 'd', default_value = "./")]
35 watch_dir: PathBuf,
36
37 #[arg(long, short = 'f')]
39 served_pdf: PathBuf,
40
41 #[arg(long, short = 's', default_value = "127.0.0.1:3000")]
43 socket_addr: SocketAddr,
44}
45
46type OptBytes = Option<Vec<u8>>;
47
48pub async fn run() -> Result<()> {
49 let args = Args::parse();
50 let (tx, rx) = watch::channel::<OptBytes>(None);
51
52 let pdf_reader = PdfReader {
53 served_pdf: args.served_pdf.clone(),
54 tx: tx.clone(),
55 current_modified_time: None,
56 };
57 let (actor_handle, actor_ref) = pdf_reader.spawn();
58 let _keep_debouncer_alive = start_watcher(args.watch_dir, actor_ref.clone())?;
59
60 let app = Router::new()
61 .route("/", get(serve_html))
62 .route("/main.mjs", get(serve_js))
63 .route("/__pdf_live_server_ws", get(ws_handler))
64 .route("/served.pdf", get(serve_pdf))
65 .layer(ServiceBuilder::new().layer(Extension(tx)))
66 .layer(ServiceBuilder::new().layer(Extension(rx)));
67
68 let listener = TcpListener::bind(args.socket_addr).await?;
69 info!("Starting to listen on {}.", args.socket_addr);
70 axum::serve(listener, app).await?;
71 actor_ref.cancel();
72 actor_handle.await?.exit_result?;
73 Ok(())
74}
75
76async fn serve_html() -> Html<&'static str> {
77 Html(include_str!("index.html"))
78}
79
80async fn serve_js() -> impl IntoResponse {
81 (
82 [(header::CONTENT_TYPE, "application/javascript")],
83 include_str!("main.mjs"),
84 )
85}
86async fn serve_pdf(Extension(mut rx): Extension<watch::Receiver<OptBytes>>) -> impl IntoResponse {
87 let maybe_bytes = rx.borrow().clone();
88 let pdf_bytes = match maybe_bytes {
89 pdf_bytes @ Some(_) => pdf_bytes,
90 None => {
91 warn!("No PDF bytes to serve for the route yet. Waiting.");
92 await_pdf_bytes(&mut rx).await
93 }
94 }
95 .unwrap_or(b"We must be shutting down.".into());
96 ([(header::CONTENT_TYPE, "application/pdf")], pdf_bytes)
97}
98
99async fn await_pdf_bytes(rx: &mut watch::Receiver<OptBytes>) -> Option<Vec<u8>> {
100 match rx.changed().await {
101 Ok(_) => rx.borrow().clone(),
102 _ => None,
103 }
104}
105
106async fn ws_handler(
107 ws: WebSocketUpgrade,
108 Extension(tx): Extension<watch::Sender<OptBytes>>,
109) -> Response {
110 ws.on_upgrade(move |socket| handle_socket(socket, tx.subscribe()))
111}
112
113async fn handle_socket(mut socket: WebSocket, mut rx: watch::Receiver<OptBytes>) {
114 debug!("Connected via WebSocket.");
115 while rx.changed().await.is_ok() {
116 let msg = Message::Binary(rx.borrow().clone().expect("Updates are all `Some`."));
117 if socket.send(msg).await.is_err() {
118 break;
119 }
120 debug!("Sent message via WebSocket.");
121 }
122 info!("Closing WebSocket connection.");
123}
124
125pub const MS100: Duration = Duration::from_millis(100);
126
127fn start_watcher(
128 watch_dir: PathBuf,
129 actor_ref: ActorRef<PdfReader>,
130) -> notify::Result<Debouncer<RecommendedWatcher>> {
131 let event_handler = move |event: notify::Result<Vec<DebouncedEvent>>| match event {
132 Err(err) => error!(?err, "file watcher"),
133 _ => actor_ref.blocking_cast(()).drop_result(),
134 };
135
136 let mut debouncer = new_debouncer(MS100, event_handler)?;
137 debouncer
138 .watcher()
139 .watch(&watch_dir, notify::RecursiveMode::Recursive)?;
140 Ok(debouncer)
141}