nakadion 0.20.0-alpha.17

Types for interacting with the Nakadi Event Broker
Documentation
use nakadi_types::model::subscription::*;

use nakadion::api::ApiClient;
use nakadion::components::{committer::*, connector::*};

use futures::{future::TryFutureExt, stream::TryStreamExt};
use serde_json::Value;
use tokio::spawn;

#[cfg(feature = "reqwest")]
#[tokio::main]
async fn main() -> Result<(), Error> {
    let client = ApiClient::builder().finish_from_env()?;

    let subscription_id = SubscriptionId::from_env()?;

    let connector = client.connector();
    let (stream_id, events_stream) = connector.events_stream::<Value>(subscription_id).await?;

    let f = events_stream.try_for_each(move |(meta, events)| {
        let client = client.clone();
        let committer = client.committer(subscription_id, stream_id);
        async move {
            if let Some(events) = events {
                println!("{:?}", events);
            } else {
                println!("empty line");
            }
            let cursor = &[meta.cursor];
            committer
                .commit(cursor)
                .map_err(|err| Error::new(format!("Could not commit: {}", err)))
                .await?;
            Ok(())
        }
    });

    spawn(f).await.map_err(Error::from_error)?
}

#[cfg(not(feature = "reqwest"))]
fn main() {
    println!("Please enable the `reqwest` feature which is a default feature");
}