digitalis-server 0.0.0

Base server module for Foxflove
Documentation
use std::{
    sync::{Arc, RwLock},
    time::{Duration, SystemTime, UNIX_EPOCH},
};

use digitalis_protobuf::{
    base::Timestamp, protobuf::MessageFull, Color, CubePrimitive, Pose, Quaternion, SceneEntity,
    SceneUpdate, Vector3,
};
use digitalis_server::{
    channel::{Channel, ChannelQueue},
    client::Client,
    core::{
        client::ClientMessage,
        common::{AdvertiseChannel, ChannelId, MessageEncoding},
        server, Control, DigitalisResult,
    },
    server::{DigitalisServer, ServerOption},
};
use tokio::time;

#[derive(Debug, Default)]
struct Server {
    channels: RwLock<Vec<Option<(AdvertiseChannel, ChannelQueue)>>>,
}

impl Server {
    fn new() -> Self {
        Self::default()
    }

    fn add_protobuf_channel<T: MessageFull>(&self, topic: &str) -> ChannelQueue {
        let mut channels = self.channels.write().unwrap();
        let id = channels.len() as ChannelId;
        let sender = Channel::start(id);

        channels.insert(
            0,
            Some((AdvertiseChannel::protobuf::<T>(id, topic), sender.clone())),
        );

        sender
    }

    async fn listen(self: &Arc<Self>, option: ServerOption) {
        let listener = option.tcp_connect().await.unwrap();

        let mut handles = vec![];

        let mut client_id = 0;
        while let Ok((stream, _)) = listener.accept().await {
            let server_clone = Arc::clone(self);
            let handle =
                tokio::spawn(
                    async move { server_clone.accept_connection(client_id, stream).await },
                );

            handles.push(handle);
            client_id += 1;
        }

        for handle in handles {
            handle.await.unwrap();
        }
    }
}

impl DigitalisServer for Server {
    fn on_open(&self, client: &Client) -> DigitalisResult<()> {
        client.nonblocking_send(
            server::ServerInfo {
                name: "digitalis-server-test".into(),
                supported_encodings: vec![MessageEncoding::Protobuf],
                ..Default::default()
            }
            .into_message()?,
        )?;

        client.nonblocking_send(
            server::Advertise {
                channels: self
                    .channels
                    .read()
                    .unwrap()
                    .iter()
                    .filter_map(|v| v.as_ref())
                    .map(|(channel, _)| channel.clone())
                    .collect(),
            }
            .into_message()?,
        )?;

        Ok(())
    }

    fn handle_message(&self, client: &Client, msg: ClientMessage) -> DigitalisResult<Control> {
        use ClientMessage::*;
        match msg {
            Subscribe(msg) => {
                let channels = self.channels.read().unwrap();
                for sub in msg.subscriptions {
                    if let Some(Some((_, channel_sender))) = channels.get(sub.channel_id as usize) {
                        channel_sender.subscribe(client.clone(), sub.id).unwrap()
                    }
                }
            }

            Unsubscribe(msg) => {
                let ids = Arc::new(msg.subscription_ids);

                for (_, sender) in self
                    .channels
                    .read()
                    .unwrap()
                    .iter()
                    .filter_map(|v| v.as_ref())
                {
                    sender.unsubscribe(client.id(), Arc::clone(&ids)).unwrap()
                }
            }

            Close => {
                for (_, sender) in self
                    .channels
                    .read()
                    .unwrap()
                    .iter()
                    .filter_map(|v| v.as_ref())
                {
                    sender.unsubscribe_all(client.id()).unwrap()
                }
                return Ok(Control::Exit);
            }

            _ => {
                log::warn!("unhandled message: {:?}", msg);
            }
        }

        Ok(Control::Continue)
    }
}

fn now() -> Timestamp {
    let now = SystemTime::now();
    let duration = now.duration_since(UNIX_EPOCH).unwrap();
    Timestamp {
        seconds: duration.as_secs() as i64,
        nanos: duration.subsec_nanos() as i32,
        ..Default::default()
    }
}

fn cube(t: &Timestamp) -> CubePrimitive {
    let angle = (t.seconds as f64 + t.nanos as f64 * 1e-9) * 0.5;

    CubePrimitive {
        size: Some(Vector3 {
            x: 1.,
            y: 1.,
            z: 1.,
            ..Default::default()
        })
        .into(),
        pose: Some(Pose {
            position: Some(Vector3 {
                x: 2.,
                y: 0.,
                z: 0.,
                ..Default::default()
            })
            .into(),
            orientation: Some(Quaternion {
                x: 0.,
                y: 0.,
                z: (angle * 0.5).sin(),
                w: (angle * 0.5).cos(),
                ..Default::default()
            })
            .into(),
            ..Default::default()
        })
        .into(),
        color: Some(Color {
            r: 0.6,
            g: 0.2,
            b: 1.,
            a: 1.,
            ..Default::default()
        })
        .into(),
        ..Default::default()
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();

    let server = Arc::new(Server::new());

    let handle = {
        let server = Arc::clone(&server);
        let sender = server.add_protobuf_channel::<SceneUpdate>("example_msg");

        tokio::spawn(async move {
            let mut interval = time::interval(Duration::from_millis(50));

            loop {
                let now = now();
                let msg = SceneUpdate {
                    entities: vec![SceneEntity {
                        frame_id: "root".into(),
                        timestamp: Some(now.clone()).into(),
                        cubes: vec![cube(&now)],
                        ..Default::default()
                    }],
                    ..Default::default()
                };
                sender
                    .broadcast(msg, now.seconds as u64 * 1_000_000_000 + now.nanos as u64)
                    .unwrap();

                interval.tick().await;
            }
        })
    };

    server.listen(ServerOption::default()).await;
    handle.await.unwrap();
}