changes_stream2/lib.rs
1//! The `changes-stream` crate is designed to give you a
2//! futures::Stream of CouchDB changes stream events.
3
4use bytes::{buf::IntoIter, Bytes};
5use futures_util::stream::Stream;
6use log::error;
7use serde_json::Value;
8use std::{mem::replace, pin::Pin, task::Poll, time::Duration};
9
10mod error;
11mod event;
12pub use error::Error;
13pub use event::{Change, ChangeEvent, Event, FinishedEvent};
14
15/// A structure which implements futures::Stream
16pub struct ChangesStream {
17 /// metrics bytes counter
18 #[cfg(feature = "metrics")]
19 bytes: metrics::Counter,
20 /// metrics entries counter
21 #[cfg(feature = "metrics")]
22 entries: metrics::Counter,
23 /// Source of http chunks provided by reqwest
24 source: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>,
25 /// Buffer of current line and current chunk iterator
26 buf: (Vec<u8>, Option<IntoIter<Bytes>>),
27}
28
29impl ChangesStream {
30 /// Constructs a new `ChangesStream` struct with a post payload
31 ///
32 /// Takes a single argument, `db`, which represents the
33 /// url of the data you wish to stream.
34 ///
35 /// For example, to create a new `ChangesStream` struct
36 /// for the npmjs registry, you would write:
37 ///
38 /// ```no_run
39 /// # use changes_stream2::{ChangesStream, Event};
40 /// # use futures_util::stream::StreamExt;
41 /// #
42 /// # #[tokio::main]
43 /// # async fn main() {
44 /// # let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string();
45 /// # let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap();
46 /// # while let Some(event) = changes.next().await {
47 /// # match event {
48 /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
49 /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
50 /// # Err(err) => println!("Error: {:?}", err),
51 /// # }
52 /// # }
53 /// # }
54 /// ```
55 pub async fn with_post_payload(db: String, payload: &Value) -> Result<Self, Error> {
56 let url = url::Url::parse(&db)?;
57 #[cfg(feature = "metrics")]
58 let database = regex::Regex::new(r"(?m)[_/]+")
59 .unwrap()
60 .replace_all(
61 &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()),
62 "_",
63 )
64 .to_string();
65
66 let client = reqwest::ClientBuilder::new()
67 .connect_timeout(Duration::from_secs(60))
68 .read_timeout(Duration::from_secs(60))
69 .build()?;
70 let res = client.post(url).json(payload).send().await?;
71 let status = res.status();
72 if !status.is_success() {
73 let body = res.text().await.unwrap_or_default();
74 return Err(Error::InvalidResponse { status, body });
75 }
76 let source = Pin::new(Box::new(res.bytes_stream()));
77
78 #[cfg(feature = "metrics")]
79 let (bytes, entries) = {
80 let bytes_name = "couchdb_changes_bytes_total";
81 let entries_name = "couchdb_changes_entries_total";
82 metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes");
83 metrics::describe_counter!(
84 entries_name,
85 metrics::Unit::Count,
86 "Changes stream entries"
87 );
88 (
89 metrics::counter!(bytes_name, "database" => database.clone()),
90 metrics::counter!(entries_name, "database" => database),
91 )
92 };
93
94 Ok(Self {
95 #[cfg(feature = "metrics")]
96 bytes,
97 #[cfg(feature = "metrics")]
98 entries,
99 source,
100 buf: (Vec::new(), None),
101 })
102 }
103
104 /// Constructs a new `ChangesStream` struct
105 ///
106 /// Takes a single argument, `db`, which represents the
107 /// url of the data you wish to stream.
108 ///
109 /// For example, to create a new `ChangesStream` struct
110 /// for the npmjs registry, you would write:
111 ///
112 /// ```no_run
113 /// # use changes_stream2::{ChangesStream, Event};
114 /// # use futures_util::stream::StreamExt;
115 /// #
116 /// # #[tokio::main]
117 /// # async fn main() {
118 /// # let url = "https://replicate.npmjs.com/_changes".to_string();
119 /// # let mut changes = ChangesStream::new(url).await.unwrap();
120 /// # while let Some(event) = changes.next().await {
121 /// # match event {
122 /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
123 /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
124 /// # Err(err) => println!("Error: {:?}", err),
125 /// # }
126 /// # }
127 /// # }
128 /// ```
129 pub async fn new(db: String) -> Result<Self, Error> {
130 Self::with_post_payload(db, &serde_json::json!({})).await
131 }
132}
133
134impl Stream for ChangesStream {
135 type Item = Result<Event, Error>;
136
137 fn poll_next(
138 mut self: std::pin::Pin<&mut Self>,
139 cx: &mut std::task::Context<'_>,
140 ) -> std::task::Poll<Option<Self::Item>> {
141 'main: loop {
142 if self.buf.1.is_none() {
143 match Stream::poll_next(self.source.as_mut(), cx) {
144 Poll::Pending => return Poll::Pending,
145 Poll::Ready(None) => return Poll::Ready(None),
146 Poll::Ready(Some(Ok(chunk))) => {
147 #[cfg(feature = "metrics")]
148 self.bytes.increment(chunk.len() as u64);
149
150 self.buf.1 = Some(chunk.into_iter())
151 }
152 Poll::Ready(Some(Err(err))) => {
153 error!("Error getting next chunk: {:?}", err);
154 return Poll::Ready(None);
155 }
156 }
157 } else {
158 let (line, chunk_iter) = &mut self.buf;
159 let iter = chunk_iter.as_mut().unwrap();
160
161 loop {
162 if let Some(byte) = iter.next() {
163 if byte == 0x0A {
164 // Found '\n', end of line
165 break;
166 }
167 line.push(byte);
168 } else {
169 // We need another chunk to fill the line
170 *chunk_iter = None;
171 continue 'main;
172 }
173 }
174
175 let line = replace(line, Vec::with_capacity(line.len() * 2));
176 if line.len() < 14 {
177 // skip prologue, epilogue and empty lines (continuous mode)
178 continue;
179 }
180
181 let mut len = line.len();
182 if line[len - 1] == 0x0D {
183 // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n"
184 len -= 1;
185 }
186 if line[len - 1] == 0x2C {
187 // 0x2C is ','
188 len -= 1;
189 }
190
191 let result = serde_json::from_slice::<ChangeEvent>(&line[..len])
192 .map(Event::Change)
193 .or_else(|error| {
194 serde_json::from_slice::<FinishedEvent>(&line[..len])
195 .map(Event::Finished)
196 .map_err(|_err| Error::ParsingFailed {
197 error,
198 json: String::from_utf8(line).unwrap_or_default(),
199 })
200 });
201
202 #[cfg(feature = "metrics")]
203 self.entries.increment(1);
204
205 return Poll::Ready(Some(result));
206 }
207 }
208 }
209}