pica 0.1.7

Pica is a virtual UWB Controller implementing the FiRa UCI specification.
Documentation
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddrV4};

use anyhow::{Context, Result};
use hyper::service::{make_service_fn, service_fn};
use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
use serde::{Deserialize, Serialize};
use serde_json::error::Category as SerdeErrorCategory;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};

use pica::{
    Category, MacAddress, PicaCommand, PicaCommandError, PicaCommandStatus, PicaEvent, Position,
};
use PicaEvent::{DeviceAdded, DeviceRemoved, DeviceUpdated, NeighborUpdated};

const STATIC_FILES: &[(&str, &str, &str)] = &[
    ("/", "text/html", include_str!("../../../static/index.html")),
    (
        "/openapi",
        "text/html",
        include_str!("../../../static/openapi.html"),
    ),
    (
        "/openapi.yaml",
        "text/yaml",
        include_str!("../../../static/openapi.yaml"),
    ),
    (
        "/src/components/Map.js",
        "application/javascript",
        include_str!("../../../static/src/components/Map.js"),
    ),
    (
        "/src/components/DeviceInfo.js",
        "application/javascript",
        include_str!("../../../static/src/components/DeviceInfo.js"),
    ),
    (
        "/src/components/Orientation.js",
        "application/javascript",
        include_str!("../../../static/src/components/Orientation.js"),
    ),
];

#[derive(Deserialize)]
struct PositionBody {
    x: i16,
    y: i16,
    z: i16,
    yaw: i16,
    pitch: i8,
    roll: i16,
}

macro_rules! position {
    ($body: ident) => {
        position!($body, false)
    };
    ($body: ident, $mandatory: ident) => {
        match serde_json::from_slice::<PositionBody>(&$body) {
            Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
            Err(err) => {
                if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
                    Position::default()
                } else {
                    let reason = format!("Error while deserializing position: {}", err);
                    println!("{}", reason);
                    return Ok(Response::builder().status(406).body(reason.into()).unwrap());
                }
            }
        }
    };
}

macro_rules! mac_address {
    ($mac_address: ident) => {
        match MacAddress::new($mac_address.to_string()) {
            Ok(mac_address) => mac_address,
            Err(err) => {
                let reason = format!("Error mac_address: {}", err);
                println!("{}", reason);
                return Ok(Response::builder().status(406).body(reason.into()).unwrap());
            }
        }
    };
}

#[derive(Debug, Serialize, Clone)]
struct Device {
    pub category: Category,
    pub mac_address: String,
    #[serde(flatten)]
    pub position: Position,
}

fn event_name(event: &PicaEvent) -> &'static str {
    match event {
        DeviceAdded { .. } => "device-added",
        DeviceRemoved { .. } => "device-removed",
        DeviceUpdated { .. } => "device-updated",
        NeighborUpdated { .. } => "neighbor-updated",
    }
}

async fn handle(
    mut req: Request<Body>,
    tx: mpsc::Sender<PicaCommand>,
    events: broadcast::Receiver<PicaEvent>,
) -> Result<Response<Body>, Infallible> {
    let static_file = STATIC_FILES
        .iter()
        .find(|(path, _, _)| req.uri().path() == *path);

    if let Some((_, mime, content)) = static_file {
        return Ok(Response::builder()
            .header("content-type", *mime)
            .body((*content).into())
            .unwrap());
    }

    let body = body::to_bytes(req.body_mut()).await.unwrap();
    let (pica_cmd_rsp_tx, pica_cmd_rsp_rx) = oneshot::channel::<PicaCommandStatus>();

    let send_cmd = |pica_cmd| async {
        println!("PicaCommand: {}", pica_cmd);
        tx.send(pica_cmd).await.unwrap();
        let (status, description) = match pica_cmd_rsp_rx.await {
            Ok(Ok(_)) => (HttpStatusCode::OK, "success".into()),
            Ok(Err(err)) => (
                match err {
                    PicaCommandError::DeviceAlreadyExists(_) => HttpStatusCode::CONFLICT,
                    PicaCommandError::DeviceNotFound(_) => HttpStatusCode::NOT_FOUND,
                },
                format!("{}", err),
            ),
            Err(err) => (
                HttpStatusCode::INTERNAL_SERVER_ERROR,
                format!("Error getting command response: {}", err),
            ),
        };
        println!("  status: {}, {}", status, description);
        Response::builder()
            .status(status)
            .body(description.into())
            .unwrap()
    };

    match req
        .uri_mut()
        .path()
        .trim_start_matches('/')
        .split('/')
        .collect::<Vec<_>>()[..]
    {
        ["events"] => {
            let stream = BroadcastStream::new(events).map(|result| {
                result.map(|event| {
                    format!(
                        "event: {}\ndata: {}\n\n",
                        event_name(&event),
                        serde_json::to_string(&event).unwrap()
                    )
                })
            });
            return Ok(Response::builder()
                .header("content-type", "text/event-stream")
                .body(Body::wrap_stream(stream))
                .unwrap());
        }
        ["init-uci-device", mac_address] => {
            return Ok(send_cmd(PicaCommand::InitUciDevice(
                mac_address!(mac_address),
                position!(body),
                pica_cmd_rsp_tx,
            ))
            .await);
        }
        ["set-position", mac_address] => {
            return Ok(send_cmd(PicaCommand::SetPosition(
                mac_address!(mac_address),
                position!(body),
                pica_cmd_rsp_tx,
            ))
            .await);
        }
        ["create-anchor", mac_address] => {
            return Ok(send_cmd(PicaCommand::CreateAnchor(
                mac_address!(mac_address),
                position!(body),
                pica_cmd_rsp_tx,
            ))
            .await);
        }
        ["destroy-anchor", mac_address] => {
            return Ok(send_cmd(PicaCommand::DestroyAnchor(
                mac_address!(mac_address),
                pica_cmd_rsp_tx,
            ))
            .await);
        }
        ["get-state"] => {
            #[derive(Serialize)]
            struct GetStateResponse {
                devices: Vec<Device>,
            }
            println!("PicaCommand: GetState");
            let (state_tx, state_rx) = oneshot::channel::<Vec<_>>();
            tx.send(PicaCommand::GetState(state_tx)).await.unwrap();
            let devices = match state_rx.await {
                Ok(devices) => GetStateResponse {
                    devices: devices
                        .into_iter()
                        .map(|(category, mac_address, position)| Device {
                            category,
                            mac_address: mac_address.into(),
                            position,
                        })
                        .collect(),
                },
                Err(_) => GetStateResponse { devices: vec![] },
            };
            let body = serde_json::to_string(&devices).unwrap();
            return Ok(Response::builder().status(200).body(body.into()).unwrap());
        }

        _ => (),
    }

    Ok(Response::builder().status(404).body("".into()).unwrap())
}

pub async fn serve(
    tx: mpsc::Sender<PicaCommand>,
    events: broadcast::Sender<PicaEvent>,
    web_port: u16,
) -> Result<()> {
    let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);

    let make_svc = make_service_fn(move |_conn| {
        let tx = tx.clone();
        let events = events.clone();
        async move {
            Ok::<_, Infallible>(service_fn(move |req| {
                handle(req, tx.clone(), events.subscribe())
            }))
        }
    });

    let server = Server::bind(&addr.into()).serve(make_svc);

    println!("Pica: Web server started on http://0.0.0.0:{}", web_port);

    server.await.context("Web Server Error")
}