1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! Broker trait definition
use std::sync::Arc;
use async_trait::async_trait;
use futures::{Future, FutureExt, sink::{Sink}, stream::{Stream, StreamExt}};

use super::{Running, Context};

// use crate::util::{Conclude};

/// Broker of the broker-reader-writer pattern
#[async_trait]
pub trait Broker: Sized {
    /// Item sent to the broker
    type Item: Send + 'static;
    /// Item sent to the writer 
    type WriterItem: Send + 'static;
    /// Ok result from `op`
    type Ok: Send;
    /// Error result from `op`
    type Error: std::error::Error + Send;

    /// The operation to perform
    async fn op<W>(
        &mut self, // use self to maintain state
        ctx: &Arc<Context<Self::Item>>,
        item: Self::Item,
        writer: W,
    ) -> Running<Result<Self::Ok, Self::Error>, Option<Self::Error>>
    where W: Sink<Self::WriterItem, Error = flume::SendError<Self::WriterItem>> + Send + Unpin; // None will stop the loop

    /// Handles the result of each op
    /// 
    /// Returns a `None` to stop the whole loop
    async fn handle_result(res: Result<Self::Ok, Self::Error>) -> Running<(), Option<Self::Error>> {
        if let Err(_err) = res {
            #[cfg(feature = "debug")]
            log::error!("{:?}", _err);
        }
        Running::Continue(())
    }

    /// Runs the operation in a loop
    async fn broker_loop<S, W, RH, WH>(
        mut self, 
        mut items: S, 
        mut writer: W, 
        ctx: Arc<Context<Self::Item>>,
        stop: flume::Receiver<()>,
        reader_handle: RH, 
        writer_handle: WH
    ) -> Result<(), Self::Error>
    where 
        S: Stream<Item = Self::Item> + Send + Unpin,
        W: Sink<Self::WriterItem, Error = flume::SendError<Self::WriterItem>> + Send + Unpin,
        RH: Future + Send,
        WH: Future + Send,
    {
        let this = &mut self;
        let f = Self::handle_result;
        loop {
            futures::select! {
                _ = stop.recv_async() => {
                    break;
                },
                item = items.next().fuse() => {
                    if let Some(item) = item {
                        match this.op(&ctx, item, &mut writer).await {
                            Running::Continue(res) => {
                                match f(res).await {
                                    Running::Continue(_) => { },
                                    Running::Stop(e) => {
                                        match e {
                                            None => return Ok(()),
                                            Some(err) => return Err(err),
                                        }
                                    }
                                }
                            },
                            // None is used to indicate stopping the loop
                            Running::Stop(e) => {
                                match e {
                                    None => return Ok(()),
                                    Some(err) => return Err(err)
                                }
                            }
                        }
                    }
                }
            }
        }

        // Stop the writer
        #[cfg(feature = "debug")]
        log::debug!("Dropping writer");
        drop(writer); 
        #[cfg(feature = "debug")]
        log::debug!(".await writer handle");
        let _ = writer_handle.await;

        // Stop the reader
        if !ctx.reader_stop.is_disconnected() {
            if ctx.reader_stop.send_async(()).await.is_ok() {
                let _ = reader_handle.await;
            }
        }

        #[cfg(feature = "debug")]
        log::debug!("Exiting broker loop");
        
        Ok(())
    }
}