kaccy_db/
query_streaming.rs1use futures::{Stream, StreamExt as FuturesStreamExt};
9use sqlx::{FromRow, PgPool};
10
11use crate::error::Result;
12
13#[derive(Debug, Clone)]
15pub struct StreamConfig {
16 pub fetch_size: usize,
18 pub max_concurrent_fetches: usize,
20}
21
22impl Default for StreamConfig {
23 fn default() -> Self {
24 Self {
25 fetch_size: 100,
26 max_concurrent_fetches: 2,
27 }
28 }
29}
30
31pub struct QueryStream<T> {
36 pool: PgPool,
37 query: String,
38 _phantom: std::marker::PhantomData<T>,
39}
40
41impl<T> QueryStream<T>
42where
43 T: Send + 'static,
44{
45 pub fn new(pool: &PgPool, query: impl Into<String>) -> Self {
49 Self {
50 pool: pool.clone(),
51 query: query.into(),
52 _phantom: std::marker::PhantomData,
53 }
54 }
55
56 pub async fn collect_vec(self) -> Result<Vec<T>>
60 where
61 T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin,
62 {
63 use futures::TryStreamExt;
64
65 let results: Vec<T> = sqlx::query_as(&self.query)
66 .fetch(&self.pool)
67 .try_collect()
68 .await?;
69
70 Ok(results)
71 }
72
73 pub async fn for_each<F, Fut>(self, mut processor: F) -> Result<u64>
77 where
78 T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin,
79 F: FnMut(T) -> Fut,
80 Fut: std::future::Future<Output = Result<()>>,
81 {
82 let mut stream = sqlx::query_as::<_, T>(&self.query).fetch(&self.pool);
83 let mut count = 0u64;
84
85 while let Some(row) = FuturesStreamExt::next(&mut stream).await {
86 let row = row?;
87 processor(row).await?;
88 count += 1;
89 }
90
91 Ok(count)
92 }
93}
94
95pub struct BufferedStreamProcessor<T> {
97 buffer_size: usize,
98 _phantom: std::marker::PhantomData<T>,
99}
100
101impl<T> BufferedStreamProcessor<T> {
102 pub fn new(buffer_size: usize) -> Self {
104 Self {
105 buffer_size,
106 _phantom: std::marker::PhantomData,
107 }
108 }
109
110 pub async fn process<S, F, Fut>(&self, mut stream: S, mut processor: F) -> Result<u64>
112 where
113 S: Stream<Item = Result<T>> + Unpin,
114 F: FnMut(T) -> Fut,
115 Fut: std::future::Future<Output = Result<()>>,
116 T: Send,
117 {
118 let mut processed = 0u64;
119 let mut buffer = Vec::with_capacity(self.buffer_size);
120
121 while let Some(item) = FuturesStreamExt::next(&mut stream).await {
122 let item = item?;
123 buffer.push(item);
124
125 if buffer.len() >= self.buffer_size {
126 for item in buffer.drain(..) {
127 processor(item).await?;
128 processed += 1;
129 }
130 }
131 }
132
133 for item in buffer {
135 processor(item).await?;
136 processed += 1;
137 }
138
139 Ok(processed)
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn test_stream_config_default() {
149 let config = StreamConfig::default();
150 assert_eq!(config.fetch_size, 100);
151 assert_eq!(config.max_concurrent_fetches, 2);
152 }
153
154 #[test]
155 fn test_buffered_stream_processor_creation() {
156 let processor = BufferedStreamProcessor::<i32>::new(50);
157 assert_eq!(processor.buffer_size, 50);
158 }
159}