1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
mod server;

#[cfg(test)]
pub mod test_request;

pub use self::server::*;
pub use fluvio_protocol::codec::FluvioCodec;

#[macro_export]
macro_rules! call_service {
    ($req:expr,$handler:expr,$sink:expr,$msg:expr) => {{
        {
            let version = $req.header.api_version();
            tracing::trace!("invoking handler: {}", $msg);
            let response = $handler.await?;
            tracing::trace!("send back response: {:#?}", &response);
            $sink.send_response(&response, version).await?;
            tracing::trace!("finish send");
        }
    }};

    ($handler:expr,$sink:expr) => {{
        call_service!($handler, $sink, "")
    }};
}

#[macro_export]
macro_rules! api_loop {
    ( $api_stream:ident, $($matcher:pat => $result:expr),*) => {{

        use futures_util::stream::StreamExt;
        loop {

            tracing::debug!("waiting for next api request");
            if let Some(msg) = $api_stream.next().await {
                if let Ok(req_message) = msg {
                    tracing::trace!("received request: {:#?}",req_message);
                    match req_message {
                        $($matcher => $result),*
                    }
                } else {
                    tracing::debug!("no content, end of connection {:#?}", msg);
                    break;
                }

            } else {
                tracing::debug!("client connect terminated");
                break;
            }
        }
    }};

    ( $api_stream:ident, $debug_msg:expr, $($matcher:pat => $result:expr),*) => {{

        use futures_util::stream::StreamExt;
        loop {

            tracing::debug!("waiting for next api request: {}",$debug_msg);
            if let Some(msg) = $api_stream.next().await {
                if let Ok(req_message) = msg {
                    tracing::trace!("received request: {:#?}",req_message);
                    match req_message {
                        $($matcher => $result),*
                    }
                } else {
                    tracing::debug!("no content, end of connection {}", $debug_msg);
                    break;
                }

            } else {
                tracing::debug!("client connect terminated: {}",$debug_msg);
                break;
            }
        }
    }};
}

/// wait for a single request
#[macro_export]
macro_rules! wait_for_request {
    ( $api_stream:ident, $matcher:pat => $result:expr) => {{
        use futures_util::stream::StreamExt;

        if let Some(msg) = $api_stream.next().await {
            if let Ok(req_message) = msg {
                tracing::trace!("received request: {:#?}", req_message);
                match req_message {
                    $matcher => $result,
                    _ => {
                        tracing::error!("unexpected request: {:#?}", req_message);
                        return Ok(());
                    }
                }
            } else {
                tracing::trace!("no content, end of connection");
                return Ok(());
            }
        } else {
            tracing::trace!("client connect terminated");
            return Ok(());
        }
    }};
}