use crate::{
client::Client,
error::{CouchError, CouchResult},
types::changes::{ChangeEvent, Event},
};
use futures_core::{Future, Stream};
use futures_util::{ready, FutureExt, StreamExt, TryStreamExt};
use reqwest::{Method, Response, StatusCode};
use std::{
collections::HashMap,
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::AsyncBufReadExt;
use tokio_stream::wrappers::LinesStream;
use tokio_util::io::StreamReader;
const COUCH_MAX_TIMEOUT: usize = 60000;
pub struct ChangesStream {
last_seq: Option<serde_json::Value>,
client: Client,
database: String,
state: ChangesStreamState,
params: HashMap<String, String>,
infinite: bool,
}
enum ChangesStreamState {
Idle,
Requesting(Pin<Box<dyn Future<Output = CouchResult<Response>>>>),
Reading(Pin<Box<dyn Stream<Item = io::Result<String>>>>),
}
impl ChangesStream {
pub fn new(client: Client, database: String, last_seq: Option<serde_json::Value>) -> Self {
let mut params = HashMap::new();
params.insert("feed".to_string(), "continuous".to_string());
params.insert("timeout".to_string(), "0".to_string());
params.insert("include_docs".to_string(), "true".to_string());
Self::with_params(client, database, last_seq, params)
}
pub fn with_params(
client: Client,
database: String,
last_seq: Option<serde_json::Value>,
params: HashMap<String, String>,
) -> Self {
Self {
client,
database,
params,
state: ChangesStreamState::Idle,
infinite: false,
last_seq,
}
}
pub fn set_last_seq(&mut self, last_seq: Option<serde_json::Value>) {
self.last_seq = last_seq;
}
pub fn set_infinite(&mut self, infinite: bool) {
self.infinite = infinite;
let timeout = if infinite {
COUCH_MAX_TIMEOUT.to_string()
} else {
0.to_string()
};
self.params.insert("timeout".to_string(), timeout);
}
pub fn last_seq(&self) -> &Option<serde_json::Value> {
&self.last_seq
}
pub fn infinite(&self) -> bool {
self.infinite
}
}
async fn get_changes(client: Client, database: String, params: HashMap<String, String>) -> CouchResult<Response> {
let path = format!("{database}/_changes");
let res = client.req(Method::GET, &path, Some(¶ms)).send().await?;
Ok(res)
}
impl Stream for ChangesStream {
type Item = CouchResult<ChangeEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
self.state = match self.state {
ChangesStreamState::Idle => {
let mut params = self.params.clone();
if let Some(seq) = &self.last_seq {
params.insert("since".to_string(), seq.to_string());
}
let fut = get_changes(self.client.clone(), self.database.clone(), params);
ChangesStreamState::Requesting(Box::pin(fut))
}
ChangesStreamState::Requesting(ref mut fut) => match ready!(fut.poll_unpin(cx)) {
Err(err) => return Poll::Ready(Some(Err(err))),
Ok(res) => {
if res.status().is_success() {
let stream = res
.bytes_stream()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let reader = StreamReader::new(stream);
let lines = Box::pin(LinesStream::new(reader.lines()));
ChangesStreamState::Reading(lines)
} else {
return Poll::Ready(Some(Err(CouchError::new(
res.status().canonical_reason().unwrap().to_string(),
res.status(),
))));
}
}
},
ChangesStreamState::Reading(ref mut lines) => {
let line = ready!(lines.poll_next_unpin(cx));
match line {
None => ChangesStreamState::Idle,
Some(Err(err)) => {
let inner = err.get_ref().and_then(|err| err.downcast_ref::<reqwest::Error>());
match inner {
Some(reqwest_err) if reqwest_err.is_timeout() && self.infinite => {
ChangesStreamState::Idle
}
Some(reqwest_err) => {
return Poll::Ready(Some(Err(CouchError::new(
reqwest_err.to_string(),
reqwest_err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
))));
}
_ => {
return Poll::Ready(Some(Err(CouchError::new(
format!("{err}"),
StatusCode::from_u16(500).unwrap(),
))));
}
}
}
Some(Ok(line)) if line.is_empty() => continue,
Some(Ok(line)) => match serde_json::from_str::<Event>(&line) {
Ok(Event::Change(event)) => {
self.last_seq = Some(event.seq.clone());
return Poll::Ready(Some(Ok(event)));
}
Ok(Event::Finished(event)) => {
self.last_seq = Some(event.last_seq.clone());
if !self.infinite {
return Poll::Ready(None);
}
ChangesStreamState::Idle
}
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
},
}
}
}
}
}
}
#[cfg(feature = "integration-tests")]
#[cfg(test)]
mod tests {
use crate::client::Client;
use futures_util::StreamExt;
use serde_json::{json, Value};
#[tokio::test]
async fn should_get_changes() {
let client = Client::new_local_test().unwrap();
let db = client.db("should_get_changes").await.unwrap();
let mut changes = db.changes(None);
changes.set_infinite(true);
let t = tokio::spawn({
let db = db.clone();
async move {
let mut docs: Vec<Value> = (0..10)
.map(|idx| {
json!({
"_id": format!("test_{}", idx),
"count": idx,
})
})
.collect();
db.bulk_docs(&mut docs).await.expect("should insert 10 documents");
}
});
let mut collected_changes = vec![];
while let Some(change) = changes.next().await {
collected_changes.push(change);
if collected_changes.len() == 10 {
break;
}
}
assert!(collected_changes.len() == 10);
t.await.unwrap();
}
}