changes_stream2/lib.rs
1//! The `changes-stream` crate is designed to give you a
2//! futures::Stream of CouchDB changes stream events.
3
4use bytes::{buf::IntoIter, Bytes};
5use futures_util::stream::Stream;
6use log::error;
7use serde_json::Value;
8use std::{mem::replace, pin::Pin, task::Poll};
9
10mod error;
11mod event;
12pub use error::Error;
13pub use event::{Change, ChangeEvent, Event, FinishedEvent};
14
15/// A structure which implements futures::Stream
16pub struct ChangesStream {
17 /// metrics bytes counter
18 #[cfg(feature = "metrics")]
19 bytes: metrics::Counter,
20 /// metrics entries counter
21 #[cfg(feature = "metrics")]
22 entries: metrics::Counter,
23 /// Source of http chunks provided by reqwest
24 source: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>,
25 /// Buffer of current line and current chunk iterator
26 buf: (Vec<u8>, Option<IntoIter<Bytes>>),
27}
28
29impl ChangesStream {
30 /// Constructs a new `ChangesStream` struct with a post payload
31 ///
32 /// Takes a single argument, `db`, which represents the
33 /// url of the data you wish to stream.
34 ///
35 /// For example, to create a new `ChangesStream` struct
36 /// for the npmjs registry, you would write:
37 ///
38 /// ```no_run
39 /// # use changes_stream2::{ChangesStream, Event};
40 /// # use futures_util::stream::StreamExt;
41 /// #
42 /// # #[tokio::main]
43 /// # async fn main() {
44 /// # let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string();
45 /// # let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap();
46 /// # while let Some(event) = changes.next().await {
47 /// # match event {
48 /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
49 /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
50 /// # Err(err) => println!("Error: {:?}", err),
51 /// # }
52 /// # }
53 /// # }
54 /// ```
55 pub async fn with_post_payload(db: String, payload: &Value) -> Result<Self, Error> {
56 let url = url::Url::parse(&db)?;
57 #[cfg(feature = "metrics")]
58 let database = regex::Regex::new(r"(?m)[_/]+")
59 .unwrap()
60 .replace_all(
61 &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()),
62 "_",
63 )
64 .to_string();
65
66 let client = reqwest::Client::new();
67 let res = client.post(url).json(payload).send().await?;
68 let status = res.status();
69 if !status.is_success() {
70 let body = res.text().await.unwrap_or_default();
71 return Err(Error::InvalidResponse { status, body });
72 }
73 let source = Pin::new(Box::new(res.bytes_stream()));
74
75 #[cfg(feature = "metrics")]
76 let (bytes, entries) = {
77 let bytes_name = "couchdb_changes_bytes_total";
78 let entries_name = "couchdb_changes_entries_total";
79 metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes");
80 metrics::describe_counter!(
81 entries_name,
82 metrics::Unit::Count,
83 "Changes stream entries"
84 );
85 (
86 metrics::counter!(bytes_name, "database" => database.clone()),
87 metrics::counter!(entries_name, "database" => database),
88 )
89 };
90
91 Ok(Self {
92 #[cfg(feature = "metrics")]
93 bytes,
94 #[cfg(feature = "metrics")]
95 entries,
96 source,
97 buf: (Vec::new(), None),
98 })
99 }
100
101 /// Constructs a new `ChangesStream` struct
102 ///
103 /// Takes a single argument, `db`, which represents the
104 /// url of the data you wish to stream.
105 ///
106 /// For example, to create a new `ChangesStream` struct
107 /// for the npmjs registry, you would write:
108 ///
109 /// ```no_run
110 /// # use changes_stream2::{ChangesStream, Event};
111 /// # use futures_util::stream::StreamExt;
112 /// #
113 /// # #[tokio::main]
114 /// # async fn main() {
115 /// # let url = "https://replicate.npmjs.com/_changes".to_string();
116 /// # let mut changes = ChangesStream::new(url).await.unwrap();
117 /// # while let Some(event) = changes.next().await {
118 /// # match event {
119 /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
120 /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
121 /// # Err(err) => println!("Error: {:?}", err),
122 /// # }
123 /// # }
124 /// # }
125 /// ```
126 pub async fn new(db: String) -> Result<Self, Error> {
127 Self::with_post_payload(db, &serde_json::json!({})).await
128 }
129}
130
131impl Stream for ChangesStream {
132 type Item = Result<Event, Error>;
133
134 fn poll_next(
135 mut self: std::pin::Pin<&mut Self>,
136 cx: &mut std::task::Context<'_>,
137 ) -> std::task::Poll<Option<Self::Item>> {
138 'main: loop {
139 if self.buf.1.is_none() {
140 match Stream::poll_next(self.source.as_mut(), cx) {
141 Poll::Pending => return Poll::Pending,
142 Poll::Ready(None) => return Poll::Ready(None),
143 Poll::Ready(Some(Ok(chunk))) => {
144 #[cfg(feature = "metrics")]
145 self.bytes.increment(chunk.len() as u64);
146
147 self.buf.1 = Some(chunk.into_iter())
148 }
149 Poll::Ready(Some(Err(err))) => {
150 error!("Error getting next chunk: {:?}", err);
151 return Poll::Ready(None);
152 }
153 }
154 } else {
155 let (line, chunk_iter) = &mut self.buf;
156 let iter = chunk_iter.as_mut().unwrap();
157
158 loop {
159 if let Some(byte) = iter.next() {
160 if byte == 0x0A {
161 // Found '\n', end of line
162 break;
163 }
164 line.push(byte);
165 } else {
166 // We need another chunk to fill the line
167 *chunk_iter = None;
168 continue 'main;
169 }
170 }
171
172 let line = replace(line, Vec::with_capacity(line.len() * 2));
173 if line.len() < 14 {
174 // skip prologue, epilogue and empty lines (continuous mode)
175 continue;
176 }
177
178 let mut len = line.len();
179 if line[len - 1] == 0x0D {
180 // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n"
181 len -= 1;
182 }
183 if line[len - 1] == 0x2C {
184 // 0x2C is ','
185 len -= 1;
186 }
187
188 let result = serde_json::from_slice::<ChangeEvent>(&line[..len])
189 .map(Event::Change)
190 .or_else(|error| {
191 serde_json::from_slice::<FinishedEvent>(&line[..len])
192 .map(Event::Finished)
193 .map_err(|_err| Error::ParsingFailed {
194 error,
195 json: String::from_utf8(line).unwrap_or_default(),
196 })
197 });
198
199 #[cfg(feature = "metrics")]
200 self.entries.increment(1);
201
202 return Poll::Ready(Some(result));
203 }
204 }
205 }
206}