ptth_server 2.1.0

The server for PTTH
Documentation
// Static file server that can plug into the PTTH reverse server

// I'm not sure if I like this one
#![allow (clippy::enum_glob_use)]

use std::{
	cmp::min,
	collections::HashMap,
	convert::{Infallible, TryFrom},
	io::SeekFrom,
	path::{
		Path,
		PathBuf,
	},
	sync::Arc,
};

use arc_swap::ArcSwap;
use handlebars::Handlebars;
use serde::Serialize;
use tokio::{
	fs::{
		DirEntry,
		File,
		ReadDir,
	},
	io::{
		AsyncReadExt,
		AsyncSeekExt,
	},
	sync::mpsc::{
		channel,
	},
};
use tracing::instrument;

use ptth_core::{
	http_serde::{
		Method,
		Response,
		StatusCode,
	},
	prelude::*,
};

pub mod errors;
pub mod metrics;

mod html;
mod internal;
#[cfg(feature = "markdown")]
mod markdown;
mod range;

use errors::FileServerError;

#[derive (Default)]
pub struct Config {
	pub file_server_root: Option <PathBuf>,
}

pub struct FileServer {
	pub config: Config,
	pub handlebars: handlebars::Handlebars <'static>,
	pub metrics_startup: metrics::Startup,
	pub metrics_interval: Arc <ArcSwap <Option <metrics::Interval>>>,
	pub hidden_path: Option <PathBuf>,
}

impl FileServer {
	pub fn new (
		file_server_root: Option <PathBuf>,
		asset_root: &Path,
		name: String,
		metrics_interval: Arc <ArcSwap <Option <metrics::Interval>>>,
		hidden_path: Option <PathBuf>,
	) -> Result <Self, FileServerError>
	{
		Ok (Self {
			config: Config {
				file_server_root,
			},
			handlebars: load_templates (asset_root)?,
			metrics_startup: metrics::Startup::new (name),
			metrics_interval,
			hidden_path,
		})
	}
}

#[derive (Serialize)]
struct DirJson {
	entries: Vec <DirEntryJson>,
}

#[derive (Serialize)]
struct DirEntryJson {
	name: String,
	size: u64,
	is_dir: bool,
}

async fn read_dir_entry_json (entry: DirEntry) -> Option <DirEntryJson>
{
	let name = entry.file_name ().into_string ().ok ()?;
	let metadata = entry.metadata ().await.ok ()?;
	let is_dir = metadata.is_dir ();
	let size = metadata.len ();
	
	Some (DirEntryJson {
		name,
		size,
		is_dir,
	})
}

async fn serve_dir_json (
	mut dir: ReadDir
) -> Result <Response, FileServerError>
{
	let mut entries = vec! [];
	
	while let Ok (Some (entry)) = dir.next_entry ().await {
		if let Some (entry) = read_dir_entry_json (entry).await {
			entries.push (entry);
		}
	}
	
	entries.sort_unstable_by (|a, b| a.name.cmp (&b.name));
	
	let dir = DirJson {
		entries,
	};
	
	let mut response = Response::default ();
	response.header ("content-type".to_string (), "application/json; charset=UTF-8".to_string ().into_bytes ());
	response.body_bytes (serde_json::to_string (&dir).unwrap ().into_bytes ());
	
	Ok (response)
}

#[instrument (level = "debug", skip (f))]
async fn serve_file (
	mut f: File,
	client_wants_body: bool,
	range: range::ValidParsed,
	if_none_match: Option <&[u8]>,
) 
-> Result <Response, FileServerError>
{
	// Tripping the etag through UTF-8 isn't the best way to encourage it to
	// be valid ASCII, but if I make it binary I might accidentally pass the
	// hash binary as a header, which is not valid.
	
	let actual_etag = get_file_etag (&f).await.map (String::into_bytes);
	
	let input = ServeFileInput {
		if_none_match,
		actual_etag,
		client_wants_body,
		range_requested: range.range_requested,
	};
	
	let decision = serve_file_decision (&input);
	
	let (range, range_requested) = (range.range, range.range_requested);
	
	info! ("Serving range {}-{}", range.start, range.end);
	
	let content_length = range.end - range.start;
	
	let body = if decision.should_send_body {
		let seek = SeekFrom::Start (range.start);
		f.seek (seek).await?;
		
		let (tx, rx) = channel (1);
		tokio::spawn (async move {
			stream_file (f, content_length, tx).await;
		});
		
		Some (rx)
	}
	else {
		None
	};
	
	let mut response = Response::default ();
	
	response.status_code (decision.status_code);
	
	// The cache-related headers in HTTP have bad names. See here:
	// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
	// The intended semantics I'm using are:
	// - etag - Some random hashed value that changes whenever the metadata
	//   (name, inode number, length, mtime) of a file changes. Also changes
	//   on new server instance. Maybe.
	// - no-cache - Clients and the relay can store this, but should revalidate
	//   with the origin server (us) because only we can check if the file
	//   changed on disk.
	// - max-age=0 - The file might change at any point during or after the
	//   request, so for proper invalidation, the client should immediately
	//   consider it stale.
	
	response.header ("cache-control".to_string (), b"no-cache,max-age=0".to_vec ());
	if let Some (etag) = input.actual_etag {
		response.header ("etag".to_string (), etag);
	};
	response.header (String::from ("accept-ranges"), b"bytes".to_vec ());
	
	if range_requested {
		response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", range.start, range.end - 1, range.end).into_bytes ());
	}
	
	response.content_length = Some (content_length);
	
	if let Some (body) = body {
		response.body (body);
	}
	
	Ok (response)
}

#[derive (Debug)]
struct ServeFileInput <'a> {
	if_none_match: Option <&'a [u8]>,
	actual_etag: Option <Vec <u8>>,
	client_wants_body: bool,
	range_requested: bool,
}

#[derive (Debug, PartialEq)]
struct ServeFileOutput {
	status_code: StatusCode,
	should_send_body: bool,
}

fn serve_file_decision (input: &ServeFileInput) -> ServeFileOutput
{
	if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag) 
	{
		if actual_etag == if_none_match {
			return ServeFileOutput {
				status_code: StatusCode::NotModified,
				should_send_body: false,
			};
		}
	}
	
	if ! input.client_wants_body {
		return ServeFileOutput {
			status_code: StatusCode::NoContent,
			should_send_body: false,
		};
	}
	
	if input.range_requested {
		return ServeFileOutput {
			status_code: StatusCode::PartialContent,
			should_send_body: true,
		};
	}
	
	ServeFileOutput {
		status_code: StatusCode::Ok,
		should_send_body: true,
	}
}

async fn get_file_etag (f: &File) -> Option <String>
{
	#[derive (Serialize)]
	struct CacheBreaker {
		len: u64,
		mtime: std::time::SystemTime,
	}
	
	let md = f.metadata ().await.ok ()?;
	
	let buf = rmp_serde::to_vec (&CacheBreaker {
		len: md.len (),
		mtime: md.modified ().ok ()?,
	}).ok ()?;
	
	let hash = blake3::hash (&buf);
	
	Some (hash.to_hex ().to_string ())
}

async fn stream_file (
	mut f: File,
	content_length: u64,
	tx: tokio::sync::mpsc::Sender <Result <Vec <u8>, Infallible>>,
) {
	let mut bytes_sent = 0;
	let mut bytes_left = content_length;
	
	let mark_interval = 200_000;
	let mut next_mark = mark_interval;
	
	loop {
		let mut buffer = vec! [0_u8; 65_536];
		let bytes_read = f.read (&mut buffer).await.expect ("Couldn't read from file");
		
		if bytes_read == 0 {
			break;
		}
		
		buffer.truncate (bytes_read);
		
		let bytes_read_64 = u64::try_from (bytes_read).expect ("Couldn't fit usize into u64");
		
		let bytes_read_64 = min (bytes_left, bytes_read_64);
		
		if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () {
			warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, content_length);
			break;
		}
		
		bytes_left -= bytes_read_64;
		if bytes_left == 0 {
			debug! ("Finished");
			break;
		}
		
		bytes_sent += bytes_read_64;
		while next_mark <= bytes_sent {
			trace! ("Sent {} bytes", next_mark);
			next_mark += mark_interval;
		}
		
		//delay_for (Duration::from_millis (50)).await;
	}
}

impl FileServer {
	/// Top-level request handler for the file server module.
	/// 
	/// Passes a request to the internal file server logic.
	/// Returns an HTTP response as HTML or JSON, depending on the request.

	#[instrument (level = "debug", skip (self, headers))]
	pub async fn serve_all (
		&self,
		method: Method,
		uri: &str,
		headers: &HashMap <String, Vec <u8>>
	)
	-> Result <Response, FileServerError>
	{
		use internal::{
			OutputFormat,
			Response::*,
		};
		
		fn serve_error <S: Into <Vec <u8>>> (
			status_code: StatusCode,
			msg: S
		)
		-> Response
		{
			let mut resp = Response::default ();
			resp.status_code (status_code);
			resp.body_bytes (msg.into ());
			resp
		}
		
		let default_root = PathBuf::from ("./");
		let root: &std::path::Path = self.config.file_server_root
		.as_ref ()
		.unwrap_or (&default_root);
		
		Ok (match internal::serve_all (root, method, uri, headers, self.hidden_path.as_deref ()).await? {
			Favicon => serve_error (StatusCode::NotFound, "Not found\n"),
			Forbidden => serve_error (StatusCode::Forbidden, "403 Forbidden\n"),
			MethodNotAllowed => serve_error (StatusCode::MethodNotAllowed, "Unsupported method\n"),
			NotFound => serve_error (StatusCode::NotFound, "404 Not Found\nAre you missing a trailing slash?\n"),
			RangeNotSatisfiable (file_len) => {
				let mut resp = Response::default ();
				resp.status_code (StatusCode::RangeNotSatisfiable)
				.header ("content-range".to_string (), format! ("bytes */{}", file_len).into_bytes ());
				resp
			},
			Redirect (location) => {
				let mut resp = Response::default ();
				resp.status_code (StatusCode::TemporaryRedirect)
				.header ("location".to_string (), location.into_bytes ());
				resp.body_bytes (b"Redirecting...\n".to_vec ());
				resp
			},
			InvalidQuery => serve_error (StatusCode::BadRequest, "Query is invalid for this object\n"),
			
			Root => html::serve_root (self).await?,
			ServeDir (internal::ServeDirParams {
				path,
				dir,
				format
			}) => match format {
				OutputFormat::Json => serve_dir_json (dir.into_inner ()).await?,
				OutputFormat::Html => html::serve_dir (&self.handlebars, &self.metrics_startup, path.to_string_lossy (), dir.into_inner ()).await?,
			},
			ServeFile (internal::ServeFileParams {
				file,
				send_body, 
				range, 
			}) => serve_file (file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?,
			MarkdownErr (e) => {
				#[cfg (feature = "markdown")]
				{
					use markdown::Error::*;
					let e = e.inner;
					let code = match &e {
						TooBig => StatusCode::InternalServerError,
						//NotMarkdown => serve_error (StatusCode::BadRequest, "File is not Markdown"),
						NotUtf8 => StatusCode::BadRequest,
					};
					
					return Ok (serve_error (code, e.to_string ()));
				}
				
				#[cfg (not (feature = "markdown"))]
				{
					let _e = e;
					serve_error (StatusCode::BadRequest, "Markdown feature is disabled")
				}
			},
			MarkdownPreview (s) => html::serve (s),
		})
	}
}

fn load_templates (
	asset_root: &Path
) 
-> Result <Handlebars <'static>, anyhow::Error>
{
	let mut handlebars = Handlebars::new ();
	handlebars.set_strict_mode (true);
	
	let asset_root = asset_root.join ("handlebars/server");
	
	for (k, v) in &[
		("file_server_dir", "file_server_dir.html"),
		("file_server_root", "file_server_root.html"),
	] {
		handlebars.register_template_file (k, asset_root.join (v))?;
	}
	
	Ok (handlebars)
}

fn pretty_print_bytes (b: u64) -> String {
	if b < 1024 {
		format! ("{} B", b)
	}
	else if (b + 512) < 1024 * 1024 {
		format! ("{} KiB", (b + 512) / 1024)
	}
	else if (b + 512 * 1024) < 1024 * 1024 * 1024 {
		format! ("{} MiB", (b + 512 * 1024) / 1024 / 1024)
	}
	else {
		format! ("{} GiB", (b + 512 * 1024 * 1024) / 1024 / 1024 / 1024)
	}
}

#[cfg (test)]
mod tests;