kaccy_db/
query_streaming.rs

1//! Query result streaming for memory-efficient iteration over large result sets.
2//!
3//! This module provides:
4//! - Async stream for large result sets
5//! - Backpressure handling
6//! - Memory-efficient iteration
7
8use futures::{Stream, StreamExt as FuturesStreamExt};
9use sqlx::{FromRow, PgPool};
10
11use crate::error::Result;
12
13/// Configuration for query streaming
14#[derive(Debug, Clone)]
15pub struct StreamConfig {
16    /// Batch size for fetching rows
17    pub fetch_size: usize,
18    /// Maximum number of concurrent fetches
19    pub max_concurrent_fetches: usize,
20}
21
22impl Default for StreamConfig {
23    fn default() -> Self {
24        Self {
25            fetch_size: 100,
26            max_concurrent_fetches: 2,
27        }
28    }
29}
30
31/// Query builder for streaming results
32///
33/// This is a lightweight wrapper that helps construct streaming queries.
34/// It doesn't implement Stream itself to avoid lifetime issues.
35pub struct QueryStream<T> {
36    pool: PgPool,
37    query: String,
38    _phantom: std::marker::PhantomData<T>,
39}
40
41impl<T> QueryStream<T>
42where
43    T: Send + 'static,
44{
45    /// Create a new query stream builder
46    ///
47    /// The pool is cloned (cheap operation as PgPool uses Arc internally)
48    pub fn new(pool: &PgPool, query: impl Into<String>) -> Self {
49        Self {
50            pool: pool.clone(),
51            query: query.into(),
52            _phantom: std::marker::PhantomData,
53        }
54    }
55
56    /// Execute the query and collect all results into a Vec
57    ///
58    /// This is the recommended method for most use cases.
59    pub async fn collect_vec(self) -> Result<Vec<T>>
60    where
61        T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin,
62    {
63        use futures::TryStreamExt;
64
65        let results: Vec<T> = sqlx::query_as(&self.query)
66            .fetch(&self.pool)
67            .try_collect()
68            .await?;
69
70        Ok(results)
71    }
72
73    /// Execute the query and process results one by one with a processor function
74    ///
75    /// This is useful for processing large result sets without loading everything into memory.
76    pub async fn for_each<F, Fut>(self, mut processor: F) -> Result<u64>
77    where
78        T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin,
79        F: FnMut(T) -> Fut,
80        Fut: std::future::Future<Output = Result<()>>,
81    {
82        let mut stream = sqlx::query_as::<_, T>(&self.query).fetch(&self.pool);
83        let mut count = 0u64;
84
85        while let Some(row) = FuturesStreamExt::next(&mut stream).await {
86            let row = row?;
87            processor(row).await?;
88            count += 1;
89        }
90
91        Ok(count)
92    }
93}
94
95/// Buffered stream processor for handling backpressure
96pub struct BufferedStreamProcessor<T> {
97    buffer_size: usize,
98    _phantom: std::marker::PhantomData<T>,
99}
100
101impl<T> BufferedStreamProcessor<T> {
102    /// Create a new buffered stream processor
103    pub fn new(buffer_size: usize) -> Self {
104        Self {
105            buffer_size,
106            _phantom: std::marker::PhantomData,
107        }
108    }
109
110    /// Process a stream with backpressure handling
111    pub async fn process<S, F, Fut>(&self, mut stream: S, mut processor: F) -> Result<u64>
112    where
113        S: Stream<Item = Result<T>> + Unpin,
114        F: FnMut(T) -> Fut,
115        Fut: std::future::Future<Output = Result<()>>,
116        T: Send,
117    {
118        let mut processed = 0u64;
119        let mut buffer = Vec::with_capacity(self.buffer_size);
120
121        while let Some(item) = FuturesStreamExt::next(&mut stream).await {
122            let item = item?;
123            buffer.push(item);
124
125            if buffer.len() >= self.buffer_size {
126                for item in buffer.drain(..) {
127                    processor(item).await?;
128                    processed += 1;
129                }
130            }
131        }
132
133        // Process remaining items
134        for item in buffer {
135            processor(item).await?;
136            processed += 1;
137        }
138
139        Ok(processed)
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn test_stream_config_default() {
149        let config = StreamConfig::default();
150        assert_eq!(config.fetch_size, 100);
151        assert_eq!(config.max_concurrent_fetches, 2);
152    }
153
154    #[test]
155    fn test_buffered_stream_processor_creation() {
156        let processor = BufferedStreamProcessor::<i32>::new(50);
157        assert_eq!(processor.buffer_size, 50);
158    }
159}