use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;
use crate::{log_json, Ledger, Shard};
pub struct Server {
listener: TcpListener,
ledger: Arc<Mutex<Ledger>>,
ingests: Arc<Mutex<u64>>,
}
impl Server {
pub fn bind(addr: &str, ledger: Arc<Mutex<Ledger>>) -> std::io::Result<Self> {
let listener = TcpListener::bind(addr)?;
if let Ok(local) = listener.local_addr() {
log_json("bind", &format!("listening {}", local));
}
Ok(Self {
listener,
ledger,
ingests: Arc::new(Mutex::new(0)),
})
}
pub fn run(self) -> std::io::Result<()> {
for stream in self.listener.incoming() {
match stream {
Ok(mut s) => {
let peer = s.peer_addr().ok();
let ledger = self.ledger.clone();
let ingests = self.ingests.clone();
thread::spawn(move || {
if let Some(addr) = peer {
log_json("conn", &format!("accepted connection from {}", addr));
}
let _ = handle_conn(&mut s, ledger, ingests);
});
}
Err(e) => {
log_json("accept_error", &format!("{}", e));
continue;
}
}
}
Ok(())
}
}
fn handle_conn(
stream: &mut TcpStream,
ledger: Arc<Mutex<Ledger>>,
ingests: Arc<Mutex<u64>>,
) -> std::io::Result<()> {
let mut buf = [0u8; 4096];
let n = stream.read(&mut buf)?;
let req = String::from_utf8_lossy(&buf[..n]);
let mut lines = req.lines();
let first = lines.next().unwrap_or("");
let path = first.split_whitespace().nth(1).unwrap_or("/");
let remote = stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".into());
log_json("request", &format!("{} - {}", remote, first));
match path {
"/healthz" => respond(stream, 200, "ok"),
"/metrics" => {
let count = *ingests.lock().unwrap();
let body = format!(
"ingests {}\nledger_digest {}\n",
count,
ledger.lock().unwrap().digest()
);
respond(stream, 200, &body)
}
_ if path.starts_with("/mint?") => {
let q = &path[6..];
let (label, seed) = parse_qs(q);
match Shard::mint(&label, seed) {
Ok(sh) => respond(
stream,
200,
&format!("{} {} {} {}\n", sh.label, sh.seed, sh.checksum, sh.energy),
),
Err(e) => respond(stream, 400, &e),
}
}
_ if path.starts_with("/ingest?") => {
let q = &path[8..];
let (label, seed) = parse_qs(q);
let checksum = parse_param(q, "checksum")
.and_then(|x| x.parse::<u128>().ok())
.unwrap_or(0);
let sh = Shard {
label,
seed,
checksum,
energy: 0,
};
match ledger.lock().unwrap().ingest(sh) {
Ok(_) => {
*ingests.lock().unwrap() += 1;
respond(stream, 200, "ingested\n")
}
Err(e) => respond(stream, 400, &e),
}
}
_ => respond(stream, 404, "not found"),
}
}
fn parse_qs(q: &str) -> (String, u64) {
let label = parse_param(q, "label").unwrap_or_else(|| "default".into());
let seed = parse_param(q, "seed")
.and_then(|x| x.parse::<u64>().ok())
.unwrap_or(0);
(label, seed)
}
fn parse_param(q: &str, k: &str) -> Option<String> {
q.split('&').find_map(|pair| {
let mut it = pair.split('=');
let key = it.next()?;
let val = it.next()?;
if key == k {
Some(url_decode(val))
} else {
None
}
})
}
fn url_decode(s: &str) -> String {
s.replace("%2D", "-").replace("%5F", "_")
}
fn respond(stream: &mut TcpStream, code: u16, body: &str) -> std::io::Result<()> {
let status = match code {
200 => "OK",
400 => "Bad Request",
404 => "Not Found",
_ => "OK",
};
let resp = format!(
"HTTP/1.1 {} {}\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
code, status, body.len(), body
);
stream.write_all(resp.as_bytes())
}