titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use std::ops::{Deref, DerefMut};
use std::vec::Vec;

use anyhow::Result;
use parking_lot::{Mutex, MutexGuard, RwLock};

use crate::connector::BaseConnector;
use crate::connector::features::composite::CancelStreamsPolicy;
use crate::utils::CancelToken;

pub struct ConnectorInner<C>
where
    C: BaseConnector,
{
    config: RwLock<Option<C::MainConfig>>,
    instance: Mutex<Option<C>>,
    cancel_streams: CancelStreamsPolicy,
}

impl<C> ConnectorInner<C>
where
    C: BaseConnector,
{
    pub fn new(config: Option<C::MainConfig>, cancel_streams: CancelStreamsPolicy) -> Self {
        Self {
            config: RwLock::new(config),
            instance: Mutex::new(None),
            cancel_streams,
        }
    }

    pub fn update_config(&self, config: Option<C::MainConfig>) {
        {
            let mut guard = self.config.write();
            *guard = config;
        }
        self.drop_instance();
    }

    pub fn unload(&self) {
        self.drop_instance();
    }

    pub fn drop_instance(&self) {
        let mut guard = self.instance.lock();
        match self.cancel_streams {
            CancelStreamsPolicy::OnInstanceDrop => {
                if let Some(connector) = guard.as_mut() {
                    connector.cancel_token().cancel();
                }
            }
            CancelStreamsPolicy::Ignore => {}
        }
        *guard = None;
    }

    pub fn config_snapshot(&self) -> Option<C::MainConfig> {
        self.config.read().clone()
    }

    pub fn ensure(
        &self,
        cancel_token: &CancelToken,
        reserved_core_ids: &Option<Vec<usize>>,
    ) -> Result<Option<ConnectorGuard<'_, C>>> {
        let config = { self.config.read().clone() };

        let Some(config) = config else {
            return Ok(None);
        };

        if let Some(guard) = self.try_get_existing() {
            return Ok(Some(guard));
        }

        let connector = C::init(config, cancel_token.clone(), reserved_core_ids.clone())?;
        let mut guard = self.instance.lock();
        if guard.is_none() {
            *guard = Some(connector);
            return Ok(Some(ConnectorGuard::new(guard)));
        }

        drop(connector);
        Ok(Some(ConnectorGuard::new(guard)))
    }

    pub fn with<F, R>(
        &self,
        cancel_token: &CancelToken,
        reserved_core_ids: &Option<Vec<usize>>,
        f: F,
    ) -> Result<Option<R>>
    where
        F: FnOnce(&mut C) -> Result<R>,
    {
        match self.ensure(cancel_token, reserved_core_ids)? {
            Some(mut guard) => {
                let out = f(&mut guard)?;
                Ok(Some(out))
            }
            None => Ok(None),
        }
    }

    pub fn try_get_existing(&self) -> Option<ConnectorGuard<'_, C>> {
        let guard = self.instance.lock();
        if guard.is_some() {
            Some(ConnectorGuard::new(guard))
        } else {
            None
        }
    }
}

impl<C> Drop for ConnectorInner<C>
where
    C: BaseConnector,
{
    fn drop(&mut self) {
        match self.cancel_streams {
            CancelStreamsPolicy::OnInstanceDrop => {
                if let Some(connector) = self.instance.get_mut().take() {
                    connector.cancel_token().cancel();
                }
            }
            CancelStreamsPolicy::Ignore => return,
        }
    }
}

pub struct ConnectorGuard<'a, C>
where
    C: BaseConnector,
{
    guard: MutexGuard<'a, Option<C>>,
}

impl<'a, C> ConnectorGuard<'a, C>
where
    C: BaseConnector,
{
    fn new(guard: MutexGuard<'a, Option<C>>) -> Self {
        debug_assert!(guard.is_some(), "Connector guard must hold a value");
        Self { guard }
    }
}

impl<'a, C> Deref for ConnectorGuard<'a, C>
where
    C: BaseConnector,
{
    type Target = C;

    fn deref(&self) -> &Self::Target {
        self.guard.as_ref().expect("Connector is uninitialized")
    }
}

impl<'a, C> DerefMut for ConnectorGuard<'a, C>
where
    C: BaseConnector,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.guard.as_mut().expect("Connector is uninitialized")
    }
}