rsiot-websocket-server 0.0.71

Websocket-сервер
Documentation
//! Компонент для подключения через websocket server.
//!
//! Перенаправляет поток входящих сообщений подключенным вебсокет-клиентам
//!

use tokio::{
    net::TcpListener,
    spawn,
    time::{sleep, Duration},
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use rsiot_component_core::{CmpInOut, ComponentError};
use rsiot_messages_core::{AuthPermissions, MsgDataBound};

use crate::{config::Config, errors::Error};

use super::{async_task_utils::cancellable_task, handle_ws_connection::handle_ws_connection};

pub async fn fn_process<TMessage>(
    input: CmpInOut<TMessage>,
    config: Config<TMessage>,
) -> Result<(), ComponentError>
where
    TMessage: MsgDataBound + 'static,
{
    info!(
        "Component cmp_websocket_server started. Config: {:?}",
        config
    );

    let cancel = CancellationToken::new();

    loop {
        let result = task_main(input.clone(), config.clone(), cancel.clone()).await;
        match result {
            Ok(_) => (),
            Err(err) => error!("{:?}", err),
        }
        info!("Restarting...");
        sleep(Duration::from_secs(2)).await;
    }
}

async fn task_main<TMessage>(
    in_out: CmpInOut<TMessage>,
    config: Config<TMessage>,
    cancel: CancellationToken,
) -> crate::Result<()>
where
    TMessage: MsgDataBound + 'static,
{
    let addr = format!("0.0.0.0:{}", config.port);

    let listener = create_tcp_listener(addr).await?;

    // слушаем порт, при получении запроса создаем новое подключение WS
    while let Ok(stream_and_addr) = listener.accept().await {
        let session_name = format!("session_{}", stream_and_addr.1);
        let future = handle_ws_connection(
            in_out.clone_with_new_id(&session_name, AuthPermissions::FullAccess),
            config.clone(),
            stream_and_addr,
        );
        spawn(cancellable_task(future, cancel.clone()));
    }

    Ok(())
}

async fn create_tcp_listener(addr: String) -> crate::Result<TcpListener> {
    let listener = TcpListener::bind(&addr).await;
    let listener = match listener {
        Ok(value) => value,
        Err(error) => {
            return Err(Error::BindToPort(error));
        }
    };
    info!("Listening on: {}", addr);
    Ok(listener)
}