techne-server 0.1.0

A simple, handmade MCP server focused on type-safety
Documentation
use crate::transport::{self, Action, Transport};

use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::stream;
use futures::{SinkExt, Stream, StreamExt};
use http::StatusCode;
use http::header::{self, HeaderValue};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty};
use http_body_util::{Full, StreamBody};
use hyper::body::{Bytes, Frame, Incoming};
use hyper::service::service_fn;
use hyper_util::rt;
use hyper_util::server::conn::auto;
use tokio::net;
use tokio::task;

use std::io;

pub struct Http {
    connections: mpsc::Receiver<io::Result<Action>>,
}

impl Http {
    pub async fn bind(address: impl net::ToSocketAddrs) -> io::Result<Self> {
        let listener = net::TcpListener::bind(address).await?;
        let (mut sender, receiver) = mpsc::channel(10);

        drop(task::spawn(async move {
            loop {
                let stream = match listener.accept().await {
                    Ok((stream, _address)) => rt::TokioIo::new(stream),
                    Err(error) => {
                        log::error!("{error}");
                        let _ = sender.send(Err(error)).await;

                        return;
                    }
                };

                let sender = sender.clone();

                drop(task::spawn(async move {
                    let service = service_fn(|request| serve(request, sender.clone()));

                    if let Err(error) = auto::Builder::new(rt::TokioExecutor::new())
                        .serve_connection_with_upgrades(stream, service)
                        .await
                    {
                        log::error!("{error}");
                    }
                }));
            }
        }));

        Ok(Self {
            connections: receiver,
        })
    }
}

impl Transport for Http {
    async fn accept(&mut self) -> io::Result<Action> {
        if let Some(result) = self.connections.next().await {
            result
        } else {
            Err(io::Error::new(
                io::ErrorKind::BrokenPipe,
                "http worker stopped running",
            ))
        }
    }
}

async fn serve(
    request: hyper::Request<Incoming>,
    mut actions: mpsc::Sender<io::Result<Action>>,
) -> Result<Response, hyper::Error> {
    Ok(match (request.method(), request.uri().path()) {
        (&http::Method::GET, "/") => {
            let (sender, result) = oneshot::channel();
            let _ = actions.send(Ok(Action::Subscribe(sender))).await;

            handle(result).await
        }
        (&http::Method::POST, "/") => {
            let bytes = request.into_body().collect().await?.to_bytes();

            let (sender, result) = oneshot::channel();
            let _ = actions.send(Ok(Action::Handle(bytes, sender))).await;

            handle(result).await
        }
        _ => not_found(),
    })
}

async fn handle(result: oneshot::Receiver<transport::Result>) -> Response {
    let Ok(result) = result.await else {
        return internal_error();
    };

    match result {
        transport::Result::Accept => status(StatusCode::ACCEPTED),
        transport::Result::Reject => bad_request(),
        transport::Result::Send(message) => ok(message),
        transport::Result::Stream(messages) => stream(messages),
        transport::Result::Unsupported => status(StatusCode::METHOD_NOT_ALLOWED),
    }
}

fn empty() -> Response {
    Response::new(
        Empty::<Bytes>::new()
            .map_err(|never| match never {})
            .boxed(),
    )
}

fn ok(bytes: Bytes) -> Response {
    let mut response = Response::new(Full::new(bytes).map_err(|never| match never {}).boxed());

    response.headers_mut().insert(
        header::CONTENT_TYPE,
        HeaderValue::from_static("application/json"),
    );

    response
}

fn stream(stream: impl Stream<Item = Bytes> + Send + Sync + 'static) -> Response {
    let mut response = Response::new(BoxBody::new(StreamBody::new(
        stream
            .flat_map(|bytes| {
                stream::iter([
                    Bytes::from_static(b"data:"),
                    bytes,
                    Bytes::from_static(b"\n\n"),
                ])
                .map(Frame::data)
            })
            .map(Ok),
    )));

    response.headers_mut().insert(
        header::CONTENT_TYPE,
        HeaderValue::from_static("text/event-stream"),
    );

    response
}

fn bad_request() -> Response {
    status(StatusCode::BAD_REQUEST)
}

fn not_found() -> Response {
    status(StatusCode::NOT_FOUND)
}

fn internal_error() -> Response {
    status(StatusCode::INTERNAL_SERVER_ERROR)
}

fn status(code: StatusCode) -> Response {
    let mut response = empty();

    *response.status_mut() = code;

    response
}

type Response = hyper::Response<BoxBody<Bytes, hyper::Error>>;