fluvio 0.50.1

The official Fluvio driver for Rust
Documentation
use std::convert::{TryFrom, TryInto};
use std::io::{Error as IoError, ErrorKind};
use std::fmt::Display;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use fluvio_sc_schema::message::MsgType;
use tracing::{error, debug, instrument};
use event_listener::{Event, EventListener};
use futures_util::stream::StreamExt;
use anyhow::Result;

use fluvio_protocol::Encoder;
use fluvio_protocol::Decoder;
use fluvio_socket::AsyncResponse;
use fluvio_sc_schema::objects::{Metadata, MetadataUpdate, ObjectApiWatchRequest, WatchResponse};
use fluvio_sc_schema::{AdminSpec, TryEncodableFrom};

use super::StoreContext;
use super::CacheMetadataStoreObject;
use crate::metadata::store::actions::LSUpdate;

pub(crate) struct SimpleEvent {
    flag: AtomicBool,
    event: Event,
}

impl SimpleEvent {
    pub(crate) fn shared() -> Arc<Self> {
        Arc::new(Self {
            flag: AtomicBool::new(false),
            event: Event::new(),
        })
    }
    // is flag set
    pub(crate) fn is_set(&self) -> bool {
        self.flag.load(Ordering::SeqCst)
    }

    pub(crate) fn listen(&self) -> EventListener {
        self.event.listen()
    }

    pub(crate) fn notify(&self) {
        self.event.notify(usize::MAX);
    }
}

/// Synchronize metadata from SC
pub(crate) struct MetadataSyncController<S: AdminSpec> {
    store: StoreContext<S>,
    shutdown: Arc<SimpleEvent>,
}

impl<S> MetadataSyncController<S>
where
    S: AdminSpec + 'static + Sync + Send,
    AsyncResponse<ObjectApiWatchRequest>: Send,
    S: Encoder + Decoder + Send + Sync,
    S::Status: Sync + Send + Encoder + Decoder,
    S::IndexKey: Display + Sync + Send,
    CacheMetadataStoreObject<S>: TryFrom<Metadata<S>>,
    <Metadata<S> as TryInto<CacheMetadataStoreObject<S>>>::Error: Display,
{
    pub(crate) fn start(
        store: StoreContext<S>,
        watch_response: AsyncResponse<ObjectApiWatchRequest>,
        shutdown: Arc<SimpleEvent>,
    ) {
        use fluvio_future::task::spawn;

        let controller = Self { store, shutdown };

        debug!(spec = %S::LABEL, "spawning sync controller");
        spawn(controller.dispatch_loop(watch_response));
    }

    #[instrument(
        skip(self,response),
        fields(
            spec = S::LABEL,
        )
    )]
    async fn dispatch_loop(mut self, mut response: AsyncResponse<ObjectApiWatchRequest>) {
        use tokio::select;

        debug!("{} starting dispatch loop", S::LABEL);

        loop {
            // check if shutdown is set
            if self.shutdown.is_set() {
                debug!("{} shutdown exiting", S::LABEL);
                break;
            }

            select! {
                _ = self.shutdown.listen() => {
                    break;
                }

                item = response.next() => {
                    debug!(spec = %S::LABEL,"received request");

                    match item {
                        Some(Ok(watch_response)) => {
                            let update_result = watch_response.downcast() as Result<Option<WatchResponse<S>>>;
                            match update_result {
                                Ok(update_opt) => {
                                    if let Some(update) = update_opt {
                                        if let Err(err) = self.sync_metadata(update.inner()).await {
                                            error!("Processing updates: {}", err);
                                        }
                                    } else {
                                        error!("invalid update type: {s}", s = S::LABEL);
                                    }
                                },
                                Err(err) => {
                                    error!("Decoding metadata update response, skipping: {}", err);
                                }
                            }

                        },
                        Some(Err(err)) => {
                            error!("Receiving response, ending: {}", err);
                            break;
                        },
                        None => {
                            debug!("No more items to receive from stream!");
                            break;
                        }
                    }
                }
            }
        }

        debug!("{} terminated", S::LABEL);
    }

    // process updates from sc
    #[instrument(
        skip(self, updates),
        fields(
            epoch = updates.epoch,
            spec = %S::LABEL,
        ),
    )]
    async fn sync_metadata(&mut self, updates: MetadataUpdate<S>) -> Result<(), IoError> {
        // Full sync
        if !updates.all.is_empty() {
            debug!(
                count = updates.all.len(),
                "Received full sync, setting store objects:"
            );
            let mut objects: Vec<CacheMetadataStoreObject<S>> = vec![];
            for meta in updates.all.into_iter() {
                let store_obj: Result<CacheMetadataStoreObject<S>, _> = meta.try_into();
                match store_obj {
                    Ok(obj) => {
                        objects.push(obj);
                    }
                    Err(err) => {
                        return Err(IoError::new(
                            ErrorKind::InvalidData,
                            format!("problem converting: {err}"),
                        ));
                    }
                }
            }
            self.store.store().sync_all(objects).await;
            return Ok(());
        }

        // Partial sync
        if !updates.changes.is_empty() {
            debug!(
                count = updates.changes.len(),
                "Received partial sync, updating store objects:"
            );
            let changes: Vec<_> = updates
                .changes
                .into_iter()
                .map(|msg| {
                    let (meta, typ) = (msg.content, msg.header);
                    let obj: Result<CacheMetadataStoreObject<S>, _> = meta.try_into();
                    obj.map(|it| (typ, it))
                })
                .collect::<Result<Vec<_>, _>>()
                .map_err(|e| {
                    IoError::new(ErrorKind::InvalidData, format!("problem converting: {e}"))
                })?
                .into_iter()
                // .map(|it| LSUpdate::Mod(it))
                .map(|(typ, obj)| match typ {
                    MsgType::UPDATE => LSUpdate::Mod(obj),
                    MsgType::DELETE => LSUpdate::Delete(obj.key),
                })
                .collect();

            self.store.store().apply_changes(changes).await;
            return Ok(());
        }

        debug!("No metadata updates received. Not syncing store");
        Ok(())
    }
}