use crate::vfs::{FileType, VfsAction, VfsRequest, VfsResponse};
use crate::{
get_blob, last_blob, LazyLoadBlob as KiBlob, Message, Request as KiRequest,
Response as KiResponse,
};
pub use http::StatusCode;
use http::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use thiserror::Error;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum HttpServerRequest {
Http(IncomingHttpRequest),
WebSocketOpen {
path: String,
channel_id: u32,
},
WebSocketPush {
channel_id: u32,
message_type: WsMessageType,
},
WebSocketClose(u32),
}
impl HttpServerRequest {
pub fn from_bytes(bytes: &[u8]) -> serde_json::Result<Self> {
serde_json::from_slice(bytes)
}
pub fn request(self) -> Option<IncomingHttpRequest> {
match self {
HttpServerRequest::Http(req) => Some(req),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IncomingHttpRequest {
source_socket_addr: Option<String>,
method: String,
url: String,
bound_path: String,
headers: HashMap<String, String>,
url_params: HashMap<String, String>,
query_params: HashMap<String, String>,
}
impl IncomingHttpRequest {
pub fn url(&self) -> Result<url::Url, url::ParseError> {
url::Url::parse(&self.url)
}
pub fn method(&self) -> Result<http::Method, http::method::InvalidMethod> {
http::Method::from_bytes(self.method.as_bytes())
}
pub fn source_socket_addr(&self) -> Result<std::net::SocketAddr, std::net::AddrParseError> {
match &self.source_socket_addr {
Some(addr) => addr.parse(),
None => "".parse(),
}
}
pub fn bound_path(&self, process_id_to_strip: Option<&str>) -> &str {
match process_id_to_strip {
Some(process_id) => self
.bound_path
.strip_prefix(&format!("/{}", process_id))
.unwrap_or(&self.bound_path),
None => &self.bound_path,
}
}
pub fn path(&self) -> Result<String, url::ParseError> {
let url = url::Url::parse(&self.url)?;
let Some(path) = url.path_segments() else {
return Err(url::ParseError::InvalidDomainCharacter);
};
let path = path.skip(1).collect::<Vec<&str>>().join("/");
Ok(format!("/{}", path))
}
pub fn headers(&self) -> HeaderMap {
let mut header_map = HeaderMap::new();
for (key, value) in self.headers.iter() {
let key_bytes = key.as_bytes();
let Ok(key_name) = HeaderName::from_bytes(key_bytes) else {
continue;
};
let Ok(value_header) = HeaderValue::from_str(&value) else {
continue;
};
header_map.insert(key_name, value_header);
}
header_map
}
pub fn url_params(&self) -> &HashMap<String, String> {
&self.url_params
}
pub fn query_params(&self) -> &HashMap<String, String> {
&self.query_params
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum WsMessageType {
Text,
Binary,
Ping,
Pong,
Close,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum HttpServerAction {
Bind {
path: String,
authenticated: bool,
local_only: bool,
cache: bool,
},
SecureBind {
path: String,
cache: bool,
},
Unbind { path: String },
WebSocketBind {
path: String,
authenticated: bool,
extension: bool,
},
WebSocketSecureBind { path: String, extension: bool },
WebSocketUnbind { path: String },
WebSocketPush {
channel_id: u32,
message_type: WsMessageType,
},
WebSocketExtPushOutgoing {
channel_id: u32,
message_type: WsMessageType,
desired_reply_type: MessageType,
},
WebSocketExtPushData {
id: u64,
kinode_message_type: MessageType,
blob: Vec<u8>,
},
WebSocketClose(u32),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HttpResponse {
pub status: u16,
pub headers: HashMap<String, String>,
}
impl HttpResponse {
pub fn new<T>(status: T) -> Self
where
T: Into<u16>,
{
Self {
status: status.into(),
headers: HashMap::new(),
}
}
pub fn set_status(mut self, status: u16) -> Self {
self.status = status;
self
}
pub fn header<T, U>(mut self, key: T, value: U) -> Self
where
T: Into<String>,
U: Into<String>,
{
self.headers.insert(key.into(), value.into());
self
}
pub fn set_headers(mut self, headers: HashMap<String, String>) -> Self {
self.headers = headers;
self
}
}
#[derive(Clone, Debug, Error, Serialize, Deserialize)]
pub enum HttpServerError {
#[error("request could not be deserialized to valid HttpServerRequest")]
MalformedRequest,
#[error("action expected blob")]
NoBlob,
#[error("path binding error: invalid source process")]
InvalidSourceProcess,
#[error("WebSocket error: ping/pong message too long")]
WsPingPongTooLong,
#[error("WebSocket error: channel not found")]
WsChannelNotFound,
#[error("timeout")]
Timeout,
#[error("unexpected response from http-server")]
UnexpectedResponse,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum MessageType {
Request,
Response,
}
#[derive(Clone, Debug)]
pub struct HttpServer {
http_paths: HashMap<String, HttpBindingConfig>,
ws_paths: HashMap<String, WsBindingConfig>,
ws_channels: HashMap<String, HashSet<u32>>,
pub timeout: u64,
}
#[derive(Clone, Debug)]
pub struct HttpBindingConfig {
authenticated: bool,
local_only: bool,
secure_subdomain: bool,
static_content: Option<KiBlob>,
}
impl HttpBindingConfig {
pub fn default() -> Self {
Self {
authenticated: true,
local_only: false,
secure_subdomain: false,
static_content: None,
}
}
pub fn new(
authenticated: bool,
local_only: bool,
secure_subdomain: bool,
static_content: Option<KiBlob>,
) -> Self {
Self {
authenticated,
local_only,
secure_subdomain,
static_content,
}
}
pub fn authenticated(mut self, authenticated: bool) -> Self {
self.authenticated = authenticated;
self
}
pub fn local_only(mut self, local_only: bool) -> Self {
self.local_only = local_only;
self
}
pub fn secure_subdomain(mut self, secure_subdomain: bool) -> Self {
self.secure_subdomain = secure_subdomain;
self
}
pub fn static_content(mut self, static_content: Option<KiBlob>) -> Self {
self.static_content = static_content;
self
}
}
#[derive(Clone, Copy, Debug)]
pub struct WsBindingConfig {
authenticated: bool,
secure_subdomain: bool,
extension: bool,
}
impl WsBindingConfig {
pub fn default() -> Self {
Self {
authenticated: true,
secure_subdomain: false,
extension: false,
}
}
pub fn new(authenticated: bool, secure_subdomain: bool, extension: bool) -> Self {
Self {
authenticated,
secure_subdomain,
extension,
}
}
pub fn authenticated(mut self, authenticated: bool) -> Self {
self.authenticated = authenticated;
self
}
pub fn secure_subdomain(mut self, secure_subdomain: bool) -> Self {
self.secure_subdomain = secure_subdomain;
self
}
pub fn extension(mut self, extension: bool) -> Self {
self.extension = extension;
self
}
}
impl HttpServer {
pub fn new(timeout: u64) -> Self {
Self {
http_paths: HashMap::new(),
ws_paths: HashMap::new(),
ws_channels: HashMap::new(),
timeout,
}
}
pub fn bind_http_path<T>(
&mut self,
path: T,
config: HttpBindingConfig,
) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let cache = config.static_content.is_some();
let req = KiRequest::to(("our", "http-server", "distro", "sys")).body(
serde_json::to_vec(&if config.secure_subdomain {
HttpServerAction::SecureBind {
path: path.clone(),
cache,
}
} else {
HttpServerAction::Bind {
path: path.clone(),
authenticated: config.authenticated,
local_only: config.local_only,
cache,
}
})
.unwrap(),
);
let res = match config.static_content.clone() {
Some(static_content) => req
.blob(static_content)
.send_and_await_response(self.timeout),
None => req.send_and_await_response(self.timeout),
};
let Ok(Message::Response { body, .. }) = res.unwrap() else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.http_paths.insert(path, config);
}
resp
}
pub fn bind_ws_path<T>(
&mut self,
path: T,
config: WsBindingConfig,
) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(if config.secure_subdomain {
serde_json::to_vec(&HttpServerAction::WebSocketSecureBind {
path: path.clone(),
extension: config.extension,
})
.unwrap()
} else {
serde_json::to_vec(&HttpServerAction::WebSocketBind {
path: path.clone(),
authenticated: config.authenticated,
extension: config.extension,
})
.unwrap()
})
.send_and_await_response(self.timeout);
let Ok(Message::Response { body, .. }) = res.unwrap() else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.ws_paths.insert(path, config);
}
resp
}
pub fn bind_http_static_path<T>(
&mut self,
path: T,
authenticated: bool,
local_only: bool,
content_type: Option<String>,
content: Vec<u8>,
) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerAction::Bind {
path: path.clone(),
authenticated,
local_only,
cache: true,
})
.unwrap(),
)
.blob(crate::kinode::process::standard::LazyLoadBlob {
mime: content_type.clone(),
bytes: content.clone(),
})
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.http_paths.insert(
path,
HttpBindingConfig {
authenticated,
local_only,
secure_subdomain: false,
static_content: Some(KiBlob {
mime: content_type,
bytes: content,
}),
},
);
}
resp
}
pub fn secure_bind_http_path<T>(&mut self, path: T) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerAction::SecureBind {
path: path.clone(),
cache: false,
})
.unwrap(),
)
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.http_paths.insert(
path,
HttpBindingConfig {
authenticated: true,
local_only: false,
secure_subdomain: true,
static_content: None,
},
);
}
resp
}
pub fn secure_bind_ws_path<T>(&mut self, path: T) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerAction::WebSocketSecureBind {
path: path.clone(),
extension: false,
})
.unwrap(),
)
.send_and_await_response(self.timeout);
let Ok(Message::Response { body, .. }) = res.unwrap() else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.ws_paths.insert(
path,
WsBindingConfig {
authenticated: true,
secure_subdomain: true,
extension: false,
},
);
}
resp
}
pub fn modify_http_path<T>(
&mut self,
path: &str,
config: HttpBindingConfig,
) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let entry = self
.http_paths
.get_mut(path)
.ok_or(HttpServerError::MalformedRequest)?;
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerAction::Bind {
path: path.to_string(),
authenticated: config.authenticated,
local_only: config.local_only,
cache: true,
})
.unwrap(),
)
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
entry.authenticated = config.authenticated;
entry.local_only = config.local_only;
entry.secure_subdomain = config.secure_subdomain;
entry.static_content = config.static_content;
}
resp
}
pub fn modify_ws_path(
&mut self,
path: &str,
config: WsBindingConfig,
) -> Result<(), HttpServerError> {
let entry = self
.ws_paths
.get_mut(path)
.ok_or(HttpServerError::MalformedRequest)?;
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(if entry.secure_subdomain {
serde_json::to_vec(&HttpServerAction::WebSocketSecureBind {
path: path.to_string(),
extension: config.extension,
})
.unwrap()
} else {
serde_json::to_vec(&HttpServerAction::WebSocketBind {
path: path.to_string(),
authenticated: config.authenticated,
extension: config.extension,
})
.unwrap()
})
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
entry.authenticated = config.authenticated;
entry.secure_subdomain = config.secure_subdomain;
entry.extension = config.extension;
}
resp
}
pub fn unbind_http_path<T>(&mut self, path: T) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(serde_json::to_vec(&HttpServerAction::Unbind { path: path.clone() }).unwrap())
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.http_paths.remove(&path);
}
resp
}
pub fn unbind_ws_path<T>(&mut self, path: T) -> Result<(), HttpServerError>
where
T: Into<String>,
{
let path: String = path.into();
let res = KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerAction::WebSocketUnbind { path: path.clone() })
.unwrap(),
)
.send_and_await_response(self.timeout)
.unwrap();
let Ok(Message::Response { body, .. }) = res else {
return Err(HttpServerError::Timeout);
};
let Ok(resp) = serde_json::from_slice::<Result<(), HttpServerError>>(&body) else {
return Err(HttpServerError::UnexpectedResponse);
};
if resp.is_ok() {
self.ws_paths.remove(&path);
}
resp
}
pub fn serve_file(
&mut self,
file_path: &str,
paths: Vec<&str>,
config: HttpBindingConfig,
) -> Result<(), HttpServerError> {
let our = crate::our();
let _res = KiRequest::to(("our", "vfs", "distro", "sys"))
.body(
serde_json::to_vec(&VfsRequest {
path: format!(
"/{}/pkg/{}",
our.package_id(),
file_path.trim_start_matches('/')
),
action: VfsAction::Read,
})
.map_err(|_| HttpServerError::MalformedRequest)?,
)
.send_and_await_response(self.timeout)
.unwrap();
let Some(mut blob) = get_blob() else {
return Err(HttpServerError::NoBlob);
};
let content_type = get_mime_type(&file_path);
blob.mime = Some(content_type);
for path in paths {
self.bind_http_path(path, config.clone().static_content(Some(blob.clone())))?;
}
Ok(())
}
pub fn serve_file_raw_path(
&mut self,
file_path: &str,
paths: Vec<&str>,
config: HttpBindingConfig,
) -> Result<(), HttpServerError> {
let _res = KiRequest::to(("our", "vfs", "distro", "sys"))
.body(
serde_json::to_vec(&VfsRequest {
path: file_path.to_string(),
action: VfsAction::Read,
})
.map_err(|_| HttpServerError::MalformedRequest)?,
)
.send_and_await_response(self.timeout)
.unwrap();
let Some(mut blob) = get_blob() else {
return Err(HttpServerError::NoBlob);
};
let content_type = get_mime_type(&file_path);
blob.mime = Some(content_type);
for path in paths {
self.bind_http_path(path, config.clone().static_content(Some(blob.clone())))?;
}
Ok(())
}
pub fn serve_ui(
&mut self,
directory: &str,
roots: Vec<&str>,
config: HttpBindingConfig,
) -> Result<(), HttpServerError> {
let our = crate::our();
let initial_path = format!("{}/pkg/{}", our.package_id(), directory);
let mut queue = std::collections::VecDeque::new();
queue.push_back(initial_path.clone());
while let Some(path) = queue.pop_front() {
let Ok(directory_response) = KiRequest::to(("our", "vfs", "distro", "sys"))
.body(
serde_json::to_vec(&VfsRequest {
path,
action: VfsAction::ReadDir,
})
.unwrap(),
)
.send_and_await_response(self.timeout)
.unwrap()
else {
return Err(HttpServerError::MalformedRequest);
};
let directory_body = serde_json::from_slice::<VfsResponse>(directory_response.body())
.map_err(|_e| HttpServerError::UnexpectedResponse)?;
let VfsResponse::ReadDir(directory_info) = directory_body else {
return Err(HttpServerError::UnexpectedResponse);
};
for entry in directory_info {
match entry.file_type {
FileType::Directory => {
queue.push_back(entry.path);
}
FileType::File => {
if entry.path.ends_with("index.html") {
for root in &roots {
self.serve_file_raw_path(
&entry.path,
vec![root, &entry.path.replace(&initial_path, "")],
config.clone(),
)?;
}
} else {
self.serve_file_raw_path(
&entry.path,
vec![&entry.path.replace(&initial_path, "")],
config.clone(),
)?;
}
}
_ => {
}
}
}
}
Ok(())
}
pub fn handle_websocket_open(&mut self, path: &str, channel_id: u32) {
self.ws_channels
.entry(path.to_string())
.or_insert(HashSet::new())
.insert(channel_id);
}
pub fn handle_websocket_close(&mut self, channel_id: u32) {
self.ws_channels.iter_mut().for_each(|(_, channels)| {
channels.remove(&channel_id);
});
}
pub fn parse_request(&self, body: &[u8]) -> Result<HttpServerRequest, HttpServerError> {
let request = serde_json::from_slice::<HttpServerRequest>(body)
.map_err(|_| HttpServerError::MalformedRequest)?;
Ok(request)
}
pub fn handle_request(
&mut self,
server_request: HttpServerRequest,
mut http_handler: impl FnMut(IncomingHttpRequest) -> (HttpResponse, Option<KiBlob>),
mut ws_handler: impl FnMut(u32, WsMessageType, KiBlob),
) {
match server_request {
HttpServerRequest::Http(http_request) => {
let (response, blob) = http_handler(http_request);
let response = KiResponse::new().body(serde_json::to_vec(&response).unwrap());
if let Some(blob) = blob {
response.blob(blob).send().unwrap();
} else {
response.send().unwrap();
}
}
HttpServerRequest::WebSocketPush {
channel_id,
message_type,
} => ws_handler(channel_id, message_type, last_blob().unwrap_or_default()),
HttpServerRequest::WebSocketOpen { path, channel_id } => {
self.handle_websocket_open(&path, channel_id);
}
HttpServerRequest::WebSocketClose(channel_id) => {
self.handle_websocket_close(channel_id);
}
}
}
pub fn ws_push_all_channels(&self, path: &str, message_type: WsMessageType, blob: KiBlob) {
ws_push_all_channels(&self.ws_channels, path, message_type, blob);
}
pub fn get_ws_channels(&self) -> HashMap<String, HashSet<u32>> {
self.ws_channels.clone()
}
pub fn bind_multiple_http_paths<T: Into<String>>(
&mut self,
paths: Vec<T>,
config: HttpBindingConfig,
) -> Result<(), HttpServerError> {
let mut bound_paths = Vec::new();
for path in paths {
let path_str = path.into();
let result = match config.secure_subdomain {
true => self.secure_bind_http_path(path_str.clone()),
false => self.bind_http_path(path_str.clone(), config.clone()),
};
match result {
Ok(_) => bound_paths.push(path_str),
Err(e) => {
for bound_path in bound_paths {
let _ = self.unbind_http_path(&bound_path);
}
return Err(e);
}
}
}
Ok(())
}
}
pub fn send_response(status: StatusCode, headers: Option<HashMap<String, String>>, body: Vec<u8>) {
KiResponse::new()
.body(
serde_json::to_vec(&HttpResponse {
status: status.as_u16(),
headers: headers.unwrap_or_default(),
})
.unwrap(),
)
.blob_bytes(body)
.send()
.unwrap()
}
pub fn send_ws_push(channel_id: u32, message_type: WsMessageType, blob: KiBlob) {
KiRequest::to(("our", "http-server", "distro", "sys"))
.body(
serde_json::to_vec(&HttpServerRequest::WebSocketPush {
channel_id,
message_type,
})
.unwrap(),
)
.blob(blob)
.send()
.unwrap()
}
pub fn ws_push_all_channels(
ws_channels: &HashMap<String, HashSet<u32>>,
path: &str,
message_type: WsMessageType,
blob: KiBlob,
) {
if let Some(channels) = ws_channels.get(path) {
channels.iter().for_each(|channel_id| {
send_ws_push(*channel_id, message_type, blob.clone());
});
}
}
pub fn get_mime_type(filename: &str) -> String {
let file_path = std::path::Path::new(filename);
let extension = file_path
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("octet-stream");
mime_guess::from_ext(extension)
.first_or_octet_stream()
.to_string()
}