futuresdr 0.0.39

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
//! Remote Control through REST API
use axum::Json;
use axum::Router;
use axum::extract::Path;
use axum::extract::State;
use axum::http::StatusCode;
use axum::routing::get;
use axum::routing::get_service;
use futures::channel::oneshot;
use std::net::SocketAddr;
use std::path;
use std::thread::JoinHandle;
use tokio::net::TcpListener;
use tower_http::cors::CorsLayer;
use tower_http::services::ServeDir;

use crate::runtime::BlockDescription;
use crate::runtime::BlockId;
use crate::runtime::FlowgraphDescription;
use crate::runtime::FlowgraphId;
use crate::runtime::Pmt;
use crate::runtime::PortId;
use crate::runtime::RuntimeHandle;
use crate::runtime::config;

macro_rules! relative {
    ($path:expr_2021) => {
        if cfg!(windows) {
            concat!(env!("CARGO_MANIFEST_DIR"), "\\", $path)
        } else {
            concat!(env!("CARGO_MANIFEST_DIR"), "/", $path)
        }
    };
}

async fn flowgraphs(State(rt): State<RuntimeHandle>) -> Json<Vec<FlowgraphId>> {
    Json::from(rt.get_flowgraphs().await)
}

async fn flowgraph_description(
    Path(fg): Path<usize>,
    State(rt): State<RuntimeHandle>,
) -> Result<Json<FlowgraphDescription>, StatusCode> {
    let fg = rt.get_flowgraph(FlowgraphId(fg));
    if let Some(mut fg) = fg.await {
        if let Ok(d) = fg.description().await {
            return Ok(Json::from(d));
        }
    }
    Err(StatusCode::BAD_REQUEST)
}

async fn block_description(
    Path((fg, blk)): Path<(usize, BlockId)>,
    State(rt): State<RuntimeHandle>,
) -> Result<Json<BlockDescription>, StatusCode> {
    let fg = rt.get_flowgraph(FlowgraphId(fg));
    if let Some(mut fg) = fg.await {
        if let Ok(d) = fg.block_description(blk).await {
            return Ok(Json::from(d));
        }
    }

    Err(StatusCode::BAD_REQUEST)
}

async fn handler_id(
    Path((fg, blk, handler)): Path<(usize, BlockId, PortId)>,
    State(rt): State<RuntimeHandle>,
) -> Result<Json<Pmt>, StatusCode> {
    let fg = rt.get_flowgraph(FlowgraphId(fg));
    if let Some(mut fg) = fg.await {
        if let Ok(ret) = fg.callback(blk, handler, Pmt::Null).await {
            return Ok(Json::from(ret));
        }
    }

    Err(StatusCode::BAD_REQUEST)
}

async fn handler_id_post(
    Path((fg, blk, handler)): Path<(usize, BlockId, PortId)>,
    State(rt): State<RuntimeHandle>,
    Json(pmt): Json<Pmt>,
) -> Result<Json<Pmt>, StatusCode> {
    let fg = rt.get_flowgraph(FlowgraphId(fg));
    if let Some(mut fg) = fg.await {
        if let Ok(ret) = fg.callback(blk, handler, pmt).await {
            return Ok(Json::from(ret));
        }
    }

    Err(StatusCode::BAD_REQUEST)
}

pub struct ControlPort {
    thread: Option<(oneshot::Sender<()>, JoinHandle<()>)>,
    handle: RuntimeHandle,
}

impl ControlPort {
    pub fn new(handle: RuntimeHandle, routes: Router) -> Self {
        let mut cp = ControlPort {
            handle,
            thread: None,
        };
        cp.start(Some(routes));
        cp
    }

    fn start(&mut self, custom_routes: Option<Router>) {
        if !config::config().ctrlport_enable {
            return;
        }

        if self.thread.is_some() {
            return;
        }

        let mut app = Router::new()
            .route("/api/fg/", get(flowgraphs))
            .route("/api/fg/{fg}/", get(flowgraph_description))
            .route("/api/fg/{fg}/block/{blk}/", get(block_description))
            .route(
                "/api/fg/{fg}/block/{blk}/call/{handler}/",
                get(handler_id).post(handler_id_post),
            )
            .layer(CorsLayer::permissive())
            .with_state(self.handle.clone());

        if let Some(c) = custom_routes {
            app = app.merge(c);
        }

        let frontend = if let Some(ref p) = config::config().frontend_path {
            Some(ServeDir::new(p))
        } else if path::Path::new(relative!("crates/prophecy/dist")).is_dir() {
            Some(ServeDir::new(relative!("crates/prophecy/dist")))
        } else {
            None
        };

        if let Some(service) = frontend {
            app = app.fallback_service(get_service(service));
        }

        let (tx_shutdown, rx_shutdown) = oneshot::channel::<()>();

        let handle = std::thread::spawn(move || {
            let runtime = match tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
            {
                Ok(runtime) => runtime,
                Err(e) => {
                    warn!("failed to start Tokio runtime {e:?}");
                    return;
                }
            };

            runtime.spawn(async move {
                if let Ok(addr) = config::config().ctrlport_bind.parse::<SocketAddr>() {
                    match TcpListener::bind(&addr).await {
                        Ok(listener) => {
                            debug!("Listening on {}", addr);
                            if let Err(e) = axum::serve(listener, app.into_make_service()).await {
                                warn!("axum server failed {e:?}");
                            }
                        }
                        _ => {
                            warn!("CtrlPort address {addr} already in use");
                        }
                    }
                } else {
                    warn!(
                        "failed to parse socket addr {}",
                        config::config().ctrlport_bind
                    );
                }
            });

            runtime.block_on(async move {
                let _ = rx_shutdown.await;
            });
        });

        self.thread = Some((tx_shutdown, handle));
    }
}

impl Drop for ControlPort {
    fn drop(&mut self) {
        if let Some((tx, handle)) = self.thread.take() {
            let _ = tx.send(());
            let _ = handle.join();
        }
    }
}