Skip to main content

fraiseql_wire/stream/
json_stream.rs

1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19
20/// Stream state machine
21///
22/// Tracks the current state of the JSON stream.
23/// Streams start in Running state and can transition to Paused
24/// or terminal states (Completed, Failed).
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamState {
27    /// Background task is actively reading from Postgres
28    Running,
29    /// Background task is paused (suspended, connection alive)
30    Paused,
31    /// Query completed normally
32    Completed,
33    /// Query failed with error
34    Failed,
35}
36
37/// Stream statistics snapshot
38///
39/// Provides a read-only view of current stream state without consuming items.
40/// All values are point-in-time measurements.
41#[derive(Debug, Clone)]
42pub struct StreamStats {
43    /// Number of items currently buffered in channel (0-256)
44    pub items_buffered: usize,
45    /// Estimated memory used by buffered items in bytes
46    pub estimated_memory: usize,
47    /// Total rows yielded to consumer so far
48    pub total_rows_yielded: u64,
49    /// Total rows filtered out by Rust predicates
50    pub total_rows_filtered: u64,
51}
52
53impl StreamStats {
54    /// Create zero-valued stats
55    ///
56    /// Useful for testing and initialization.
57    pub fn zero() -> Self {
58        Self {
59            items_buffered: 0,
60            estimated_memory: 0,
61            total_rows_yielded: 0,
62            total_rows_filtered: 0,
63        }
64    }
65}
66
67/// JSON value stream
68pub struct JsonStream {
69    receiver: mpsc::Receiver<Result<Value>>,
70    _cancel_tx: mpsc::Sender<()>,  // Dropped when stream is dropped
71    entity: String,                // Entity name for metrics
72    rows_yielded: Arc<AtomicU64>,  // Counter of items yielded to consumer
73    rows_filtered: Arc<AtomicU64>, // Counter of items filtered
74    max_memory: Option<usize>,     // Optional memory limit in bytes
75    soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
76
77    // Lightweight state tracking (cheap AtomicU8)
78    // Used for fast state checks on all queries
79    // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
80    state_atomic: Arc<AtomicU8>,
81
82    // Pause/resume state machine (lazily initialized)
83    // Only allocated if pause() is called
84    pause_resume: Option<PauseResumeState>,
85
86    // Sampling counter for metrics recording (sample 1 in N polls)
87    poll_count: AtomicU64, // Counter for sampling metrics
88}
89
90/// Pause/resume state (lazily allocated)
91/// Only created when pause() is first called
92pub struct PauseResumeState {
93    state: Arc<Mutex<StreamState>>,     // Current stream state
94    pause_signal: Arc<Notify>,          // Signal to pause background task
95    resume_signal: Arc<Notify>,         // Signal to resume background task
96    paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
97    pause_timeout: Option<Duration>,    // Optional auto-resume timeout
98}
99
100impl JsonStream {
101    /// Create new JSON stream
102    pub(crate) fn new(
103        receiver: mpsc::Receiver<Result<Value>>,
104        cancel_tx: mpsc::Sender<()>,
105        entity: String,
106        max_memory: Option<usize>,
107        _soft_limit_warn_threshold: Option<f32>,
108        soft_limit_fail_threshold: Option<f32>,
109    ) -> Self {
110        Self {
111            receiver,
112            _cancel_tx: cancel_tx,
113            entity,
114            rows_yielded: Arc::new(AtomicU64::new(0)),
115            rows_filtered: Arc::new(AtomicU64::new(0)),
116            max_memory,
117            soft_limit_fail_threshold,
118
119            // Initialize lightweight atomic state
120            state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
121
122            // Pause/resume lazily initialized (only if pause() is called)
123            pause_resume: None,
124
125            // Initialize sampling counter
126            poll_count: AtomicU64::new(0),
127        }
128    }
129
130    /// Initialize pause/resume state (called on first pause())
131    fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
132        if self.pause_resume.is_none() {
133            self.pause_resume = Some(PauseResumeState {
134                state: Arc::new(Mutex::new(StreamState::Running)),
135                pause_signal: Arc::new(Notify::new()),
136                resume_signal: Arc::new(Notify::new()),
137                paused_occupancy: Arc::new(AtomicUsize::new(0)),
138                pause_timeout: None,
139            });
140        }
141        self.pause_resume.as_mut().unwrap()
142    }
143
144    /// Get current stream state
145    ///
146    /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
147    /// This is a synchronous getter that doesn't require awaiting.
148    pub fn state_snapshot(&self) -> StreamState {
149        // This is a best-effort snapshot that may return slightly stale state
150        // For guaranteed accurate state, use `state()` method
151        StreamState::Running // Will be updated when state machine is fully integrated
152    }
153
154    /// Get buffered rows when paused
155    ///
156    /// Returns the number of rows buffered in the channel when the stream was paused.
157    /// Only meaningful when stream is in Paused state.
158    pub fn paused_occupancy(&self) -> usize {
159        self.pause_resume
160            .as_ref()
161            .map(|pr| pr.paused_occupancy.load(Ordering::Relaxed))
162            .unwrap_or(0)
163    }
164
165    /// Set timeout for pause (auto-resume after duration)
166    ///
167    /// When a stream is paused, the background task will automatically resume
168    /// after the specified duration expires, even if resume() is not called.
169    ///
170    /// # Arguments
171    ///
172    /// * `duration` - How long to stay paused before auto-resuming
173    ///
174    /// # Examples
175    ///
176    /// ```ignore
177    /// let mut stream = client.query::<T>("entity").execute().await?;
178    /// stream.set_pause_timeout(Duration::from_secs(5));
179    /// stream.pause().await?;  // Will auto-resume after 5 seconds
180    /// ```
181    pub fn set_pause_timeout(&mut self, duration: Duration) {
182        self.ensure_pause_resume().pause_timeout = Some(duration);
183        tracing::debug!("pause timeout set to {:?}", duration);
184    }
185
186    /// Clear pause timeout (no auto-resume)
187    pub fn clear_pause_timeout(&mut self) {
188        if let Some(ref mut pr) = self.pause_resume {
189            pr.pause_timeout = None;
190            tracing::debug!("pause timeout cleared");
191        }
192    }
193
194    /// Get current pause timeout (if set)
195    pub(crate) fn pause_timeout(&self) -> Option<Duration> {
196        self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
197    }
198
199    /// Pause the stream
200    ///
201    /// Suspends the background task from reading more data from Postgres.
202    /// The connection remains open and can be resumed later.
203    /// Buffered rows are preserved and can be consumed normally.
204    ///
205    /// This method is idempotent: calling pause() on an already-paused stream is a no-op.
206    ///
207    /// Returns an error if the stream has already completed or failed.
208    ///
209    /// # Example
210    ///
211    /// ```ignore
212    /// stream.pause().await?;
213    /// // Background task stops reading
214    /// // Consumer can still poll for remaining buffered items
215    /// ```
216    pub async fn pause(&mut self) -> Result<()> {
217        let entity = self.entity.clone();
218
219        // Update lightweight atomic state first (fast path)
220        self.state_atomic_set_paused();
221
222        let pr = self.ensure_pause_resume();
223        let mut state = pr.state.lock().await;
224
225        match *state {
226            StreamState::Running => {
227                // Signal background task to pause
228                pr.pause_signal.notify_one();
229                // Update state
230                *state = StreamState::Paused;
231
232                // Record metric
233                crate::metrics::counters::stream_paused(&entity);
234                Ok(())
235            }
236            StreamState::Paused => {
237                // Idempotent: already paused
238                Ok(())
239            }
240            StreamState::Completed | StreamState::Failed => {
241                // Cannot pause a terminal stream
242                Err(Error::Protocol(
243                    "cannot pause a completed or failed stream".to_string(),
244                ))
245            }
246        }
247    }
248
249    /// Resume the stream
250    ///
251    /// Resumes the background task to continue reading data from Postgres.
252    /// Only has an effect if the stream is currently paused.
253    ///
254    /// This method is idempotent: calling resume() before pause() or on an
255    /// already-running stream is a no-op.
256    ///
257    /// Returns an error if the stream has already completed or failed.
258    ///
259    /// # Example
260    ///
261    /// ```ignore
262    /// stream.resume().await?;
263    /// // Background task resumes reading
264    /// // Consumer can poll for more items
265    /// ```
266    pub async fn resume(&mut self) -> Result<()> {
267        // Update lightweight atomic state first (fast path)
268        // Check atomic state before borrowing pause_resume
269        let current = self.state_atomic_get();
270
271        // Resume only makes sense if pause/resume was initialized
272        if let Some(ref mut pr) = self.pause_resume {
273            let entity = self.entity.clone();
274
275            // Note: Set to RUNNING to reflect resumed state
276            if current == STATE_PAUSED {
277                // Only update atomic if currently paused
278                self.state_atomic.store(STATE_RUNNING, Ordering::Release);
279            }
280
281            let mut state = pr.state.lock().await;
282
283            match *state {
284                StreamState::Paused => {
285                    // Signal background task to resume
286                    pr.resume_signal.notify_one();
287                    // Update state
288                    *state = StreamState::Running;
289
290                    // Record metric
291                    crate::metrics::counters::stream_resumed(&entity);
292                    Ok(())
293                }
294                StreamState::Running => {
295                    // Idempotent: already running (or resume before pause)
296                    Ok(())
297                }
298                StreamState::Completed | StreamState::Failed => {
299                    // Cannot resume a terminal stream
300                    Err(Error::Protocol(
301                        "cannot resume a completed or failed stream".to_string(),
302                    ))
303                }
304            }
305        } else {
306            // No pause/resume infrastructure (never paused), idempotent no-op
307            Ok(())
308        }
309    }
310
311    /// Pause the stream with a diagnostic reason
312    ///
313    /// Like pause(), but logs the provided reason for diagnostic purposes.
314    /// This helps track why streams are being paused (e.g., "backpressure",
315    /// "maintenance", "rate limit").
316    ///
317    /// # Arguments
318    ///
319    /// * `reason` - Optional reason for pausing (logged at debug level)
320    ///
321    /// # Example
322    ///
323    /// ```ignore
324    /// stream.pause_with_reason("backpressure: consumer busy").await?;
325    /// ```
326    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
327        tracing::debug!("pausing stream: {}", reason);
328        self.pause().await
329    }
330
331    /// Clone internal state for passing to background task (only if pause/resume is initialized)
332    pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
333        self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
334    }
335
336    /// Clone pause signal for passing to background task (only if pause/resume is initialized)
337    pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
338        self.pause_resume
339            .as_ref()
340            .map(|pr| Arc::clone(&pr.pause_signal))
341    }
342
343    /// Clone resume signal for passing to background task (only if pause/resume is initialized)
344    pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
345        self.pause_resume
346            .as_ref()
347            .map(|pr| Arc::clone(&pr.resume_signal))
348    }
349
350    // =========================================================================
351    // Lightweight state machine methods
352    // =========================================================================
353
354    /// Clone atomic state for passing to background task
355    pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
356        Arc::clone(&self.state_atomic)
357    }
358
359    /// Get current state from atomic (fast path, no locks)
360    pub(crate) fn state_atomic_get(&self) -> u8 {
361        self.state_atomic.load(Ordering::Acquire)
362    }
363
364    /// Set state to paused using atomic
365    pub(crate) fn state_atomic_set_paused(&self) {
366        self.state_atomic.store(STATE_PAUSED, Ordering::Release);
367    }
368
369    /// Get current stream statistics
370    ///
371    /// Returns a snapshot of stream state without consuming any items.
372    /// This can be called at any time to monitor progress.
373    ///
374    /// # Example
375    ///
376    /// ```ignore
377    /// let stats = stream.stats();
378    /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
379    /// ```
380    pub fn stats(&self) -> StreamStats {
381        let items_buffered = self.receiver.len();
382        let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
383        let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
384        let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
385
386        StreamStats {
387            items_buffered,
388            estimated_memory,
389            total_rows_yielded,
390            total_rows_filtered,
391        }
392    }
393
394    /// Increment rows yielded counter (called from FilteredStream)
395    #[allow(unused)]
396    pub(crate) fn increment_rows_yielded(&self, count: u64) {
397        self.rows_yielded.fetch_add(count, Ordering::Relaxed);
398    }
399
400    /// Increment rows filtered counter (called from FilteredStream)
401    #[allow(unused)]
402    pub(crate) fn increment_rows_filtered(&self, count: u64) {
403        self.rows_filtered.fetch_add(count, Ordering::Relaxed);
404    }
405
406    /// Clone the yielded counter for passing to background task
407    #[allow(unused)]
408    pub(crate) fn clone_rows_yielded(&self) -> Arc<AtomicU64> {
409        Arc::clone(&self.rows_yielded)
410    }
411
412    /// Clone the filtered counter for passing to background task
413    #[allow(unused)]
414    pub(crate) fn clone_rows_filtered(&self) -> Arc<AtomicU64> {
415        Arc::clone(&self.rows_filtered)
416    }
417}
418
419impl Stream for JsonStream {
420    type Item = Result<Value>;
421
422    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
423        // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
424        // For 100K rows, this records ~100 times instead of 100K times
425        let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
426        if poll_idx % 1000 == 0 {
427            let occupancy = self.receiver.len() as u64;
428            crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
429            crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
430        }
431
432        // Check memory limit BEFORE receiving (pre-enqueue strategy)
433        // This stops consuming when buffer reaches limit
434        if let Some(limit) = self.max_memory {
435            let items_buffered = self.receiver.len();
436            let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
437
438            // Check soft limit thresholds first (warn before fail)
439            if let Some(fail_threshold) = self.soft_limit_fail_threshold {
440                let threshold_bytes = (limit as f32 * fail_threshold) as usize;
441                if estimated_memory > threshold_bytes {
442                    // Record metric for memory limit exceeded
443                    crate::metrics::counters::memory_limit_exceeded(&self.entity);
444                    return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
445                        limit,
446                        estimated_memory,
447                    })));
448                }
449            } else if estimated_memory > limit {
450                // Hard limit (no soft limits configured)
451                crate::metrics::counters::memory_limit_exceeded(&self.entity);
452                return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
453                    limit,
454                    estimated_memory,
455                })));
456            }
457
458            // Note: Warn threshold would be handled by instrumentation/logging layer
459            // This is for application-level monitoring, not a hard error
460        }
461
462        self.receiver.poll_recv(cx)
463    }
464}
465
466/// Extract JSON bytes from DataRow message
467pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
468    match msg {
469        BackendMessage::DataRow(fields) => {
470            if fields.len() != 1 {
471                return Err(Error::Protocol(format!(
472                    "expected 1 field, got {}",
473                    fields.len()
474                )));
475            }
476
477            let field = &fields[0];
478            field
479                .clone()
480                .ok_or_else(|| Error::Protocol("null data field".into()))
481        }
482        _ => Err(Error::Protocol("expected DataRow".into())),
483    }
484}
485
486/// Parse JSON bytes into Value
487pub fn parse_json(data: Bytes) -> Result<Value> {
488    let value: Value = serde_json::from_slice(&data)?;
489    Ok(value)
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn test_extract_json_bytes() {
498        let data = Bytes::from_static(b"{\"key\":\"value\"}");
499        let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
500
501        let extracted = extract_json_bytes(&msg).unwrap();
502        assert_eq!(extracted, data);
503    }
504
505    #[test]
506    fn test_extract_null_field() {
507        let msg = BackendMessage::DataRow(vec![None]);
508        assert!(extract_json_bytes(&msg).is_err());
509    }
510
511    #[test]
512    fn test_parse_json() {
513        let data = Bytes::from_static(b"{\"key\":\"value\"}");
514        let value = parse_json(data).unwrap();
515
516        assert_eq!(value["key"], "value");
517    }
518
519    #[test]
520    fn test_parse_invalid_json() {
521        let data = Bytes::from_static(b"not json");
522        assert!(parse_json(data).is_err());
523    }
524
525    #[test]
526    fn test_stream_stats_creation() {
527        let stats = StreamStats::zero();
528        assert_eq!(stats.items_buffered, 0);
529        assert_eq!(stats.estimated_memory, 0);
530        assert_eq!(stats.total_rows_yielded, 0);
531        assert_eq!(stats.total_rows_filtered, 0);
532    }
533
534    #[test]
535    fn test_stream_stats_memory_estimation() {
536        let stats = StreamStats {
537            items_buffered: 100,
538            estimated_memory: 100 * 2048,
539            total_rows_yielded: 100,
540            total_rows_filtered: 10,
541        };
542
543        // 100 items * 2KB per item = 200KB
544        assert_eq!(stats.estimated_memory, 204800);
545    }
546
547    #[test]
548    fn test_stream_stats_clone() {
549        let stats = StreamStats {
550            items_buffered: 50,
551            estimated_memory: 100000,
552            total_rows_yielded: 500,
553            total_rows_filtered: 50,
554        };
555
556        let cloned = stats.clone();
557        assert_eq!(cloned.items_buffered, stats.items_buffered);
558        assert_eq!(cloned.estimated_memory, stats.estimated_memory);
559        assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
560        assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
561    }
562}