dnp3 1.6.0

Rust implementation of DNP3 (IEEE 1815) with idiomatic bindings for C, C++, .NET, and Java
Documentation
use crate::app::{ConnectStrategy, Listener, RetryStrategy, Shutdown};
use crate::tcp::{ClientState, ConnectOptions, Connector, EndpointList, PostConnectionHandler};
use crate::util::phys::PhysLayer;
use crate::util::session::{RunError, Session, StopReason};
use std::time::Duration;

pub(crate) struct ClientTask {
    session: Session,
    connector: Connector,
    reconnect_delay: Duration,
    listener: Box<dyn Listener<ClientState>>,
}

impl ClientTask {
    pub(crate) fn new(
        session: Session,
        endpoints: EndpointList,
        connect_strategy: ConnectStrategy,
        connect_options: ConnectOptions,
        connection_handler: PostConnectionHandler,
        listener: Box<dyn Listener<ClientState>>,
    ) -> Self {
        let retry_strategy = RetryStrategy::new(
            connect_strategy.min_connect_delay,
            connect_strategy.max_connect_delay,
        );

        Self {
            connector: Connector::new(
                endpoints,
                connect_options,
                retry_strategy,
                connection_handler,
            ),
            reconnect_delay: connect_strategy.reconnect_delay,
            session,
            listener,
        }
    }

    pub(crate) async fn run(&mut self) {
        let _ = self.run_impl().await;
        self.listener.update(ClientState::Shutdown).get().await;
    }

    async fn run_impl(&mut self) -> Result<(), Shutdown> {
        loop {
            self.listener.update(ClientState::Disabled).get().await;
            self.session.wait_for_enabled().await?;
            if let Err(StopReason::Shutdown) = self.run_connection().await {
                return Err(Shutdown);
            }
        }
    }

    async fn run_connection(&mut self) -> Result<(), StopReason> {
        loop {
            self.run_one_connection().await?;
        }
    }

    async fn run_one_connection(&mut self) -> Result<(), StopReason> {
        self.listener.update(ClientState::Connecting).get().await;
        match self.connector.connect().await {
            Ok(phys) => {
                self.listener.update(ClientState::Connected).get().await;
                self.run_phys(phys).await
            }
            Err(delay) => {
                tracing::info!("waiting {} ms to retry connection", delay.as_millis());
                self.listener
                    .update(ClientState::WaitAfterFailedConnect(delay))
                    .get()
                    .await;
                self.session.wait_for_retry(delay).await
            }
        }
    }

    async fn run_phys(&mut self, mut phys: PhysLayer) -> Result<(), StopReason> {
        match self.session.run(&mut phys).await {
            RunError::Stop(s) => Err(s),
            RunError::Link(err) => {
                tracing::warn!("connection lost - {}", err);

                self.listener
                    .update(ClientState::WaitAfterDisconnect(self.reconnect_delay))
                    .get()
                    .await;

                if !self.reconnect_delay.is_zero() {
                    tracing::warn!(
                        "waiting {} ms to reconnect",
                        self.reconnect_delay.as_millis()
                    );

                    self.session.wait_for_retry(self.reconnect_delay).await?;
                }

                Ok(())
            }
        }
    }
}