nautilus-network 0.59.0

Network communication machinery for the Nautilus trading engine
// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

//! Socket configuration.
//!
//! # Reconnection Strategy
//!
//! The default configuration uses unlimited reconnection attempts (`reconnect_max_attempts: None`).
//! This is intentional for trading systems because:
//! - Venues may be down for extended periods but eventually recover.
//! - Exponential backoff already prevents resource waste.
//! - Automatic recovery can be useful when manual intervention is not desirable.
//!
//! Use `Some(n)` primarily for testing, development, or non-critical connections.

use std::fmt::Debug;

use tokio_tungstenite::tungstenite::stream::Mode;

use super::types::TcpMessageHandler;
use crate::error::{NetworkConfigError, NetworkConfigResult};

/// Configuration for TCP socket connection.
#[derive(bon::Builder)]
#[builder(finish_fn(name = build_inner, vis = ""))]
#[cfg_attr(
    feature = "python",
    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network", from_py_object)
)]
#[cfg_attr(
    feature = "python",
    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.network")
)]
pub struct SocketConfig {
    /// The URL to connect to.
    pub url: String,
    /// The connection mode {Plain, TLS}.
    pub mode: Mode,
    /// The sequence of bytes which separates lines.
    pub suffix: Vec<u8>,
    /// The optional function to handle incoming messages.
    pub message_handler: Option<TcpMessageHandler>,
    /// The optional heartbeat with period and beat message.
    pub heartbeat: Option<(u64, Vec<u8>)>,
    /// The timeout (milliseconds) for reconnection attempts.
    pub reconnect_timeout_ms: Option<u64>,
    /// The initial reconnection delay (milliseconds) for reconnects.
    pub reconnect_delay_initial_ms: Option<u64>,
    /// The maximum reconnect delay (milliseconds) for exponential backoff.
    pub reconnect_delay_max_ms: Option<u64>,
    /// The exponential backoff factor for reconnection delays.
    pub reconnect_backoff_factor: Option<f64>,
    /// The maximum jitter (milliseconds) added to reconnection delays.
    pub reconnect_jitter_ms: Option<u64>,
    /// The maximum number of initial connection attempts (default: 5).
    pub connection_max_retries: Option<u32>,
    /// The maximum number of reconnection attempts before giving up.
    /// - `None`: Unlimited reconnection attempts (default, recommended for production).
    /// - `Some(n)`: After n failed attempts, transition to CLOSED state.
    pub reconnect_max_attempts: Option<u32>,
    /// The idle timeout (milliseconds) for the read task.
    /// When set, the read task will break and trigger reconnection if no data
    /// is received within this duration. Useful for detecting silently dead
    /// connections where the server stops sending without closing.
    pub idle_timeout_ms: Option<u64>,
    /// The path to the certificates directory.
    pub certs_dir: Option<String>,
}

impl<S: socket_config_builder::IsComplete> SocketConfigBuilder<S> {
    /// Validates and builds the [`SocketConfig`].
    ///
    /// # Errors
    ///
    /// Returns a [`NetworkConfigError`] if any field fails validation
    /// (see [`SocketConfig::validate`]).
    pub fn build(self) -> NetworkConfigResult<SocketConfig> {
        let config = self.build_inner();
        config.validate()?;
        Ok(config)
    }
}

impl SocketConfig {
    /// Checks whether all socket settings are valid.
    ///
    /// # Errors
    ///
    /// Returns a [`NetworkConfigError`] if `url` is empty, the heartbeat interval or a
    /// reconnection timing field is not positive, `reconnect_backoff_factor` is not finite and
    /// at least `1.0`, or `reconnect_delay_initial_ms` exceeds `reconnect_delay_max_ms`.
    pub fn validate(&self) -> NetworkConfigResult<()> {
        let mut errors = Vec::new();

        if self.url.trim().is_empty() {
            errors.push(NetworkConfigError::invalid("url", "must not be empty"));
        }

        if let Some((interval, _)) = &self.heartbeat
            && *interval == 0
        {
            errors.push(NetworkConfigError::invalid(
                "heartbeat",
                "interval must be positive",
            ));
        }

        // `reconnect_jitter_ms` is intentionally unchecked: zero disables jitter and
        // `ExponentialBackoff::new` accepts it.
        for (field, value) in [
            ("reconnect_timeout_ms", self.reconnect_timeout_ms),
            (
                "reconnect_delay_initial_ms",
                self.reconnect_delay_initial_ms,
            ),
            ("reconnect_delay_max_ms", self.reconnect_delay_max_ms),
            ("idle_timeout_ms", self.idle_timeout_ms),
        ] {
            if let Some(value) = value
                && value == 0
            {
                errors.push(NetworkConfigError::invalid(
                    field,
                    format!("must be positive, was {value}"),
                ));
            }
        }

        if let Some(factor) = self.reconnect_backoff_factor
            && !(factor.is_finite() && factor >= 1.0)
        {
            errors.push(NetworkConfigError::invalid(
                "reconnect_backoff_factor",
                format!("must be finite and >= 1.0, was {factor}"),
            ));
        }

        if let (Some(initial), Some(max)) =
            (self.reconnect_delay_initial_ms, self.reconnect_delay_max_ms)
            && initial > max
        {
            errors.push(NetworkConfigError::invalid(
                "reconnect_delay_initial_ms",
                format!("must not exceed reconnect_delay_max_ms ({max}), was {initial}"),
            ));
        }

        NetworkConfigError::collect(errors)
    }
}

impl Debug for SocketConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct(stringify!(SocketConfig))
            .field("url", &self.url)
            .field("mode", &self.mode)
            .field("suffix", &self.suffix)
            .field(
                "message_handler",
                &self.message_handler.as_ref().map(|_| "<function>"),
            )
            .field("heartbeat", &self.heartbeat)
            .field("reconnect_timeout_ms", &self.reconnect_timeout_ms)
            .field(
                "reconnect_delay_initial_ms",
                &self.reconnect_delay_initial_ms,
            )
            .field("reconnect_delay_max_ms", &self.reconnect_delay_max_ms)
            .field("reconnect_backoff_factor", &self.reconnect_backoff_factor)
            .field("reconnect_jitter_ms", &self.reconnect_jitter_ms)
            .field("connection_max_retries", &self.connection_max_retries)
            .field("reconnect_max_attempts", &self.reconnect_max_attempts)
            .field("idle_timeout_ms", &self.idle_timeout_ms)
            .field("certs_dir", &self.certs_dir)
            .finish()
    }
}

impl Clone for SocketConfig {
    fn clone(&self) -> Self {
        Self {
            url: self.url.clone(),
            mode: self.mode,
            suffix: self.suffix.clone(),
            message_handler: self.message_handler.clone(),
            heartbeat: self.heartbeat.clone(),
            reconnect_timeout_ms: self.reconnect_timeout_ms,
            reconnect_delay_initial_ms: self.reconnect_delay_initial_ms,
            reconnect_delay_max_ms: self.reconnect_delay_max_ms,
            reconnect_backoff_factor: self.reconnect_backoff_factor,
            reconnect_jitter_ms: self.reconnect_jitter_ms,
            connection_max_retries: self.connection_max_retries,
            reconnect_max_attempts: self.reconnect_max_attempts,
            idle_timeout_ms: self.idle_timeout_ms,
            certs_dir: self.certs_dir.clone(),
        }
    }
}

#[cfg(test)]
mod tests {
    use rstest::rstest;
    use tokio_tungstenite::tungstenite::stream::Mode;

    use super::SocketConfig;
    use crate::error::NetworkConfigError;

    fn valid_config() -> SocketConfig {
        SocketConfig::builder()
            .url("tcp://127.0.0.1:8080".to_string())
            .mode(Mode::Plain)
            .suffix(vec![b'\n'])
            .build()
            .expect("baseline socket config should be valid")
    }

    #[rstest]
    fn test_builder_accepts_valid_config() {
        let result = SocketConfig::builder()
            .url("tcp://127.0.0.1:8080".to_string())
            .mode(Mode::Plain)
            .suffix(vec![b'\n'])
            .build();

        assert!(result.is_ok());
    }

    #[rstest]
    fn test_validate_accepts_zero_jitter() {
        let mut config = valid_config();
        config.reconnect_jitter_ms = Some(0);

        assert!(config.validate().is_ok());
    }

    #[rstest]
    #[case::empty_url(|c: &mut SocketConfig| c.url = String::new(), "url")]
    #[case::heartbeat(|c: &mut SocketConfig| c.heartbeat = Some((0, vec![])), "heartbeat")]
    #[case::reconnect_timeout(|c: &mut SocketConfig| c.reconnect_timeout_ms = Some(0), "reconnect_timeout_ms")]
    #[case::reconnect_delay_initial(|c: &mut SocketConfig| c.reconnect_delay_initial_ms = Some(0), "reconnect_delay_initial_ms")]
    #[case::reconnect_delay_max(|c: &mut SocketConfig| c.reconnect_delay_max_ms = Some(0), "reconnect_delay_max_ms")]
    #[case::idle_timeout(|c: &mut SocketConfig| c.idle_timeout_ms = Some(0), "idle_timeout_ms")]
    fn test_validate_rejects_invalid_field(
        #[case] mutate: fn(&mut SocketConfig),
        #[case] expected_field: &str,
    ) {
        let mut config = valid_config();
        mutate(&mut config);

        let err = config
            .validate()
            .expect_err("invalid value should be rejected");

        assert!(
            matches!(err, NetworkConfigError::Invalid { field, .. } if field == expected_field)
        );
    }

    #[rstest]
    #[case::too_small(0.5)]
    #[case::nan(f64::NAN)]
    #[case::infinite(f64::INFINITY)]
    fn test_validate_rejects_invalid_backoff_factor(#[case] factor: f64) {
        let mut config = valid_config();
        config.reconnect_backoff_factor = Some(factor);

        let err = config
            .validate()
            .expect_err("invalid backoff factor should be rejected");

        assert!(
            matches!(err, NetworkConfigError::Invalid { field, .. } if field == "reconnect_backoff_factor")
        );
    }

    #[rstest]
    fn test_validate_rejects_delay_initial_exceeding_max() {
        let mut config = valid_config();
        config.reconnect_delay_initial_ms = Some(5_000);
        config.reconnect_delay_max_ms = Some(1_000);

        let err = config
            .validate()
            .expect_err("initial delay above max should be rejected");

        assert!(
            matches!(err, NetworkConfigError::Invalid { field, .. } if field == "reconnect_delay_initial_ms")
        );
    }

    #[rstest]
    fn test_validate_collects_multiple_errors() {
        let mut config = valid_config();
        config.url = String::new();
        config.reconnect_timeout_ms = Some(0);

        let err = config.validate().expect_err("multiple invalid fields");

        match err {
            NetworkConfigError::Multiple { errors } => assert_eq!(errors.len(), 2),
            other @ NetworkConfigError::Invalid { .. } => {
                panic!("expected Multiple, was {other:?}")
            }
        }
    }
}