1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! The Rust client library for writing streaming applications with Fluvio
//!
//! Fluvio is a high performance, low latency data streaming platform built for developers.
//!
//! When writing streaming applications, two of your core behaviors are producing messages
//! and consuming messages. When you produce a message, you send it to a Fluvio cluster
//! where it is recorded and saved for later usage. When you consume a message, you are
//! reading a previously-stored message from that same Fluvio cluster. Let's get started
//! with a quick example where we produce and consume some messages.
//!
//! # Prerequisites
//!
//! [Install Fluvio](#installation)
//!
//! # Fluvio Echo
//!
//! The easiest way to see Fluvio in action is to produce some messages and to consume
//! them right away. In this sense, we can use Fluvio to make an "echo service".
//!
//! All messages in Fluvio are sent in a sort of category called a `Topic`. You can think
//! of a Topic as a named folder where you want to store some files, which would be your
//! messages. If you're familiar with relational databases, you can think of a Topic as
//! being similar to a database table, but for streaming.
//!
//! As the application developer, you get to decide what Topics you create and which
//! messages you send to them. We need to set up a Topic before running our code. For the
//! echo example, we'll call our topic `echo`.
//!
//! # Example
//!
//! The easiest way to create a Fluvio Topic is by using the [Fluvio CLI].
//!
//! ```bash
//! $ fluvio topic create echo
//! topic "echo" created
//! ```
//!
//! There are convenience methods that let you get up-and-started quickly using default
//! configurations. Later if you want to customize your setup, you can directly use the
//! [`Fluvio`] client object.
//!
//! ```no_run
//! # mod futures {
//! #     pub use futures_util::stream::StreamExt;
//! # }
//! use std::time::Duration;
//! use fluvio::{Offset, FluvioError};
//! use futures::StreamExt;
//!
//! async_std::task::spawn(produce_records());
//! if let Err(e) = async_std::task::block_on(consume_records()) {
//!     println!("Error: {}", e);
//! }
//!
//! async fn produce_records() -> Result<(), FluvioError> {
//!     let producer = fluvio::producer("echo").await?;
//!     for i in 0..10u8 {
//!         producer.send_record(format!("Hello, Fluvio {}!", i), 0).await?;
//!         async_std::task::sleep(Duration::from_secs(1)).await;
//!     }
//!     Ok(())
//! }
//!
//! async fn consume_records() -> Result<(), FluvioError> {
//!     let consumer = fluvio::consumer("echo", 0).await?;
//!     let mut stream = consumer.stream(Offset::beginning()).await?;
//!
//!     while let Some(Ok(record)) = stream.next().await {
//!         if let Some(bytes) = record.try_into_bytes() {
//!             let string = String::from_utf8_lossy(&bytes);
//!             println!("Got record: {}", string);
//!         }
//!     }
//!     Ok(())
//! }
//! ```
//!
//! [Fluvio CLI]: https://nightly.fluvio.io/docs/cli/
//! [`Fluvio`]: ./struct.Fluvio.html
#![cfg_attr(
    feature = "nightly",
    doc(include = "../../../website/kubernetes/INSTALL.md")
)]

mod error;
mod client;
mod admin;
mod params;
mod consumer;
mod producer;
mod offset;
mod sync;
mod spu;

pub mod config;

pub use error::FluvioError;
pub use config::FluvioConfig;
pub use producer::TopicProducer;
pub use consumer::{PartitionConsumer, ConsumerConfig};
pub use offset::Offset;

pub use crate::admin::FluvioAdmin;
pub use crate::client::Fluvio;

/// Creates a producer that sends events to the named topic
///
/// This is a shortcut function that uses the current profile
/// settings. If you need to specify any custom configurations,
/// try directly creating a [`Fluvio`] client object instead.
///
/// # Example
///
/// ```no_run
/// # use fluvio::FluvioError;
/// # async fn do_produce() -> Result<(), FluvioError> {
/// let producer = fluvio::producer("my-topic").await?;
/// producer.send_record("Hello, world!", 0).await?;
/// # Ok(())
/// # }
/// ```
///
/// [`Fluvio`]: ./struct.Fluvio.html
pub async fn producer<S: Into<String>>(topic: S) -> Result<TopicProducer, FluvioError> {
    let fluvio = Fluvio::connect().await?;
    let producer = fluvio.topic_producer(topic).await?;
    Ok(producer)
}

/// Creates a producer that receives events from the given topic and partition
///
/// This is a shortcut function that uses the current profile
/// settings. If you need to specify any custom configurations,
/// try directly creating a [`Fluvio`] client object instead.
///
/// # Example
///
/// ```no_run
/// # use fluvio::{ConsumerConfig, FluvioError, Offset};
/// # mod futures {
/// #     pub use futures_util::stream::StreamExt;
/// # }
/// #  async fn do_consume() -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// let consumer = fluvio::consumer("my-topic", 0).await?;
/// let mut stream = consumer.stream(Offset::beginning()).await?;
/// while let Some(Ok(record)) = stream.next().await {
///     if let Some(bytes) = record.try_into_bytes() {
///         let string = String::from_utf8_lossy(&bytes);
///         println!("Got record: {}", string);
///     }
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Fluvio`]: ./struct.Fluvio.html
pub async fn consumer<S: Into<String>>(
    topic: S,
    partition: i32,
) -> Result<PartitionConsumer, FluvioError> {
    let fluvio = Fluvio::connect().await?;
    let consumer = fluvio.partition_consumer(topic, partition).await?;
    Ok(consumer)
}

/// re-export metadata from sc-api
pub mod metadata {

    pub mod topic {
        pub use fluvio_sc_schema::topic::*;
    }

    pub mod spu {
        pub use fluvio_sc_schema::spu::*;
    }

    pub mod spg {
        pub use fluvio_sc_schema::spg::*;
    }

    pub mod partition {
        pub use fluvio_sc_schema::partition::*;
    }

    pub mod objects {
        pub use fluvio_sc_schema::objects::*;
    }

    pub mod core {
        pub use fluvio_sc_schema::core::*;
    }

    pub mod store {
        pub use fluvio_sc_schema::store::*;
    }
}

pub mod dataplane {
    pub use dataplane::*;
}