Skip to main content

fraiseql_wire/stream/json_stream/
mod.rs

1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Result, WireError};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17pub const STATE_RUNNING: u8 = 0;
18pub const STATE_PAUSED: u8 = 1;
19pub const STATE_COMPLETED: u8 = 2;
20pub const STATE_FAILED: u8 = 3;
21
22/// Stream state machine
23///
24/// Tracks the current state of the JSON stream.
25/// Streams start in Running state and can transition to Paused
26/// or terminal states (Completed, Failed).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum StreamState {
30    /// Background task is actively reading from Postgres
31    Running,
32    /// Background task is paused (suspended, connection alive)
33    Paused,
34    /// Query completed normally
35    Completed,
36    /// Query failed with error
37    Failed,
38}
39
40/// Stream statistics snapshot
41///
42/// Provides a read-only view of current stream state without consuming items.
43/// All values are point-in-time measurements.
44#[derive(Debug, Clone)]
45pub struct StreamStats {
46    /// Number of items currently buffered in channel (0-256)
47    pub items_buffered: usize,
48    /// Estimated memory used by buffered items in bytes
49    pub estimated_memory: usize,
50    /// Total rows yielded to consumer so far
51    pub total_rows_yielded: u64,
52    /// Total rows filtered out by Rust predicates
53    pub total_rows_filtered: u64,
54}
55
56impl StreamStats {
57    /// Create zero-valued stats
58    ///
59    /// Useful for testing and initialization.
60    #[must_use]
61    pub const fn zero() -> Self {
62        Self {
63            items_buffered: 0,
64            estimated_memory: 0,
65            total_rows_yielded: 0,
66            total_rows_filtered: 0,
67        }
68    }
69}
70
71/// JSON value stream
72pub struct JsonStream {
73    receiver: mpsc::Receiver<Result<Value>>,
74    _cancel_tx: mpsc::Sender<()>,  // Dropped when stream is dropped
75    entity: String,                // Entity name for metrics
76    rows_yielded: Arc<AtomicU64>,  // Counter of items yielded to consumer
77    rows_filtered: Arc<AtomicU64>, // Counter of items filtered
78    max_memory: Option<usize>,     // Optional memory limit in bytes
79    soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
80
81    // Lightweight state tracking (cheap AtomicU8)
82    // Used for fast state checks on all queries
83    // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
84    state_atomic: Arc<AtomicU8>,
85
86    // Pause/resume state machine (lazily initialized)
87    // Only allocated if pause() is called
88    pause_resume: Option<PauseResumeState>,
89
90    // Sampling counter for metrics recording (sample 1 in N polls)
91    poll_count: AtomicU64, // Counter for sampling metrics
92}
93
94/// Pause/resume state (lazily allocated)
95/// Only created when `pause()` is first called
96pub struct PauseResumeState {
97    state: Arc<Mutex<StreamState>>,     // Current stream state
98    pause_signal: Arc<Notify>,          // Signal to pause background task
99    resume_signal: Arc<Notify>,         // Signal to resume background task
100    paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
101    pause_timeout: Option<Duration>,    // Optional auto-resume timeout
102}
103
104impl JsonStream {
105    /// Create new JSON stream
106    pub(crate) fn new(
107        receiver: mpsc::Receiver<Result<Value>>,
108        cancel_tx: mpsc::Sender<()>,
109        entity: String,
110        max_memory: Option<usize>,
111        _soft_limit_warn_threshold: Option<f32>,
112        soft_limit_fail_threshold: Option<f32>,
113    ) -> Self {
114        Self {
115            receiver,
116            _cancel_tx: cancel_tx,
117            entity,
118            rows_yielded: Arc::new(AtomicU64::new(0)),
119            rows_filtered: Arc::new(AtomicU64::new(0)),
120            max_memory,
121            soft_limit_fail_threshold,
122
123            // Initialize lightweight atomic state
124            state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
125
126            // Pause/resume lazily initialized (only if pause() is called)
127            pause_resume: None,
128
129            // Initialize sampling counter
130            poll_count: AtomicU64::new(0),
131        }
132    }
133
134    /// Initialize pause/resume state (called on first `pause()`)
135    fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
136        if self.pause_resume.is_none() {
137            self.pause_resume = Some(PauseResumeState {
138                state: Arc::new(Mutex::new(StreamState::Running)),
139                pause_signal: Arc::new(Notify::new()),
140                resume_signal: Arc::new(Notify::new()),
141                paused_occupancy: Arc::new(AtomicUsize::new(0)),
142                pause_timeout: None,
143            });
144        }
145        self.pause_resume
146            .as_mut()
147            .expect("pause_resume initialized in the block above")
148    }
149
150    /// Get current stream state
151    ///
152    /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
153    /// This is a synchronous getter that doesn't require awaiting.
154    ///
155    /// Note: This is a best-effort snapshot that may return slightly stale state
156    /// due to the non-blocking nature of atomic reads.
157    pub fn state_snapshot(&self) -> StreamState {
158        // Read from lightweight atomic state (fast path, no locks)
159        match self.state_atomic.load(Ordering::Acquire) {
160            STATE_RUNNING => StreamState::Running,
161            STATE_PAUSED => StreamState::Paused,
162            STATE_COMPLETED => StreamState::Completed,
163            STATE_FAILED => StreamState::Failed,
164            _ => {
165                // Unknown state - fall back to checking if channel is closed
166                if self.receiver.is_closed() {
167                    StreamState::Completed
168                } else {
169                    StreamState::Running
170                }
171            }
172        }
173    }
174
175    /// Get buffered rows when paused
176    ///
177    /// Returns the number of rows buffered in the channel when the stream was paused.
178    /// Only meaningful when stream is in Paused state.
179    pub fn paused_occupancy(&self) -> usize {
180        self.pause_resume
181            .as_ref()
182            .map_or(0, |pr| pr.paused_occupancy.load(Ordering::Relaxed))
183    }
184
185    /// Set timeout for pause (auto-resume after duration)
186    ///
187    /// When a stream is paused, the background task will automatically resume
188    /// after the specified duration expires, even if `resume()` is not called.
189    ///
190    /// # Arguments
191    ///
192    /// * `duration` - How long to stay paused before auto-resuming
193    ///
194    /// # Examples
195    ///
196    /// ```text
197    /// // Requires: a JsonStream instance from execute_query.
198    /// // Note: set_pause_timeout is on JsonStream, not QueryStream.
199    /// // Use FraiseClient::execute_query (internal) to obtain a JsonStream directly.
200    /// use std::time::Duration;
201    /// stream.set_pause_timeout(Duration::from_secs(5));
202    /// stream.pause().await?;  // Will auto-resume after 5 seconds
203    /// ```
204    pub fn set_pause_timeout(&mut self, duration: Duration) {
205        self.ensure_pause_resume().pause_timeout = Some(duration);
206        tracing::debug!("pause timeout set to {:?}", duration);
207    }
208
209    /// Clear pause timeout (no auto-resume)
210    pub fn clear_pause_timeout(&mut self) {
211        if let Some(ref mut pr) = self.pause_resume {
212            pr.pause_timeout = None;
213            tracing::debug!("pause timeout cleared");
214        }
215    }
216
217    /// Get current pause timeout (if set)
218    pub(crate) fn pause_timeout(&self) -> Option<Duration> {
219        self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
220    }
221
222    /// Pause the stream
223    ///
224    /// Suspends the background task from reading more data from Postgres.
225    /// The connection remains open and can be resumed later.
226    /// Buffered rows are preserved and can be consumed normally.
227    ///
228    /// This method is idempotent: calling `pause()` on an already-paused stream is a no-op.
229    ///
230    /// Returns an error if the stream has already completed or failed.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
235    ///
236    /// # Example
237    ///
238    /// ```no_run
239    /// // Requires: live Postgres streaming connection.
240    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
241    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
242    /// stream.pause().await?;
243    /// // Background task stops reading
244    /// // Consumer can still poll for remaining buffered items
245    /// # Ok(())
246    /// # }
247    /// ```
248    pub async fn pause(&mut self) -> Result<()> {
249        let entity = self.entity.clone();
250
251        // Update lightweight atomic state first (fast path)
252        self.state_atomic_set_paused();
253
254        let pr = self.ensure_pause_resume();
255        let mut state = pr.state.lock().await;
256
257        match *state {
258            StreamState::Running => {
259                // Signal background task to pause
260                pr.pause_signal.notify_one();
261                // Update state
262                *state = StreamState::Paused;
263
264                // Record metric
265                crate::metrics::counters::stream_paused(&entity);
266                Ok(())
267            }
268            StreamState::Paused => {
269                // Idempotent: already paused
270                Ok(())
271            }
272            StreamState::Completed | StreamState::Failed => {
273                // Cannot pause a terminal stream
274                Err(WireError::Protocol(
275                    "cannot pause a completed or failed stream".to_string(),
276                ))
277            }
278        }
279    }
280
281    /// Resume the stream
282    ///
283    /// Resumes the background task to continue reading data from Postgres.
284    /// Only has an effect if the stream is currently paused.
285    ///
286    /// This method is idempotent: calling `resume()` before `pause()` or on an
287    /// already-running stream is a no-op.
288    ///
289    /// Returns an error if the stream has already completed or failed.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
294    ///
295    /// # Example
296    ///
297    /// ```no_run
298    /// // Requires: live Postgres streaming connection.
299    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
300    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
301    /// stream.resume().await?;
302    /// // Background task resumes reading
303    /// // Consumer can poll for more items
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn resume(&mut self) -> Result<()> {
308        // Update lightweight atomic state first (fast path)
309        // Check atomic state before borrowing pause_resume
310        let current = self.state_atomic_get();
311
312        // Resume only makes sense if pause/resume was initialized
313        if let Some(ref mut pr) = self.pause_resume {
314            let entity = self.entity.clone();
315
316            // Note: Set to RUNNING to reflect resumed state
317            if current == STATE_PAUSED {
318                // Only update atomic if currently paused
319                self.state_atomic.store(STATE_RUNNING, Ordering::Release);
320            }
321
322            let mut state = pr.state.lock().await;
323
324            match *state {
325                StreamState::Paused => {
326                    // Signal background task to resume
327                    pr.resume_signal.notify_one();
328                    // Update state
329                    *state = StreamState::Running;
330
331                    // Record metric
332                    crate::metrics::counters::stream_resumed(&entity);
333                    Ok(())
334                }
335                StreamState::Running => {
336                    // Idempotent: already running (or resume before pause)
337                    Ok(())
338                }
339                StreamState::Completed | StreamState::Failed => {
340                    // Cannot resume a terminal stream
341                    Err(WireError::Protocol(
342                        "cannot resume a completed or failed stream".to_string(),
343                    ))
344                }
345            }
346        } else {
347            // No pause/resume infrastructure (never paused), idempotent no-op
348            Ok(())
349        }
350    }
351
352    /// Pause the stream with a diagnostic reason
353    ///
354    /// Like `pause()`, but logs the provided reason for diagnostic purposes.
355    /// This helps track why streams are being paused (e.g., "backpressure",
356    /// "maintenance", "rate limit").
357    ///
358    /// # Arguments
359    ///
360    /// * `reason` - Optional reason for pausing (logged at debug level)
361    ///
362    /// # Errors
363    ///
364    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
365    ///
366    /// # Example
367    ///
368    /// ```no_run
369    /// // Requires: live Postgres streaming connection.
370    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
371    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
372    /// stream.pause_with_reason("backpressure: consumer busy").await?;
373    /// # Ok(())
374    /// # }
375    /// ```
376    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
377        tracing::debug!("pausing stream: {}", reason);
378        self.pause().await
379    }
380
381    /// Clone internal state for passing to background task (only if pause/resume is initialized)
382    pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
383        self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
384    }
385
386    /// Clone pause signal for passing to background task (only if pause/resume is initialized)
387    pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
388        self.pause_resume
389            .as_ref()
390            .map(|pr| Arc::clone(&pr.pause_signal))
391    }
392
393    /// Clone resume signal for passing to background task (only if pause/resume is initialized)
394    pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
395        self.pause_resume
396            .as_ref()
397            .map(|pr| Arc::clone(&pr.resume_signal))
398    }
399
400    // =========================================================================
401    // Lightweight state machine methods
402    // =========================================================================
403
404    /// Clone atomic state for passing to background task
405    pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
406        Arc::clone(&self.state_atomic)
407    }
408
409    /// Get current state from atomic (fast path, no locks)
410    pub(crate) fn state_atomic_get(&self) -> u8 {
411        self.state_atomic.load(Ordering::Acquire)
412    }
413
414    /// Set state to paused using atomic
415    pub(crate) fn state_atomic_set_paused(&self) {
416        self.state_atomic.store(STATE_PAUSED, Ordering::Release);
417    }
418
419    /// Set state to completed using atomic
420    pub(crate) fn state_atomic_set_completed(&self) {
421        self.state_atomic.store(STATE_COMPLETED, Ordering::Release);
422    }
423
424    /// Set state to failed using atomic
425    pub(crate) fn state_atomic_set_failed(&self) {
426        self.state_atomic.store(STATE_FAILED, Ordering::Release);
427    }
428
429    /// Get current stream statistics
430    ///
431    /// Returns a snapshot of stream state without consuming any items.
432    /// This can be called at any time to monitor progress.
433    ///
434    /// # Example
435    ///
436    /// ```no_run
437    /// // Requires: live Postgres streaming connection.
438    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
439    /// let stream = client.query::<serde_json::Value>("entity").execute().await?;
440    /// let stats = stream.stats();
441    /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
442    /// # Ok(())
443    /// # }
444    /// ```
445    pub fn stats(&self) -> StreamStats {
446        let items_buffered = self.receiver.len();
447        let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
448        let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
449        let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
450
451        StreamStats {
452            items_buffered,
453            estimated_memory,
454            total_rows_yielded,
455            total_rows_filtered,
456        }
457    }
458}
459
460impl Stream for JsonStream {
461    type Item = Result<Value>;
462
463    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
464        // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
465        // For 100K rows, this records ~100 times instead of 100K times
466        let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
467        if poll_idx.is_multiple_of(1000) {
468            let occupancy = self.receiver.len() as u64;
469            crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
470            crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
471        }
472
473        // Check memory limit BEFORE receiving (pre-enqueue strategy)
474        // This stops consuming when buffer reaches limit
475        if let Some(limit) = self.max_memory {
476            let items_buffered = self.receiver.len();
477            let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
478
479            // Check soft limit thresholds first (warn before fail)
480            if let Some(fail_threshold) = self.soft_limit_fail_threshold {
481                let threshold_bytes = (limit as f32 * fail_threshold) as usize;
482                if estimated_memory > threshold_bytes {
483                    // Record metric for memory limit exceeded
484                    crate::metrics::counters::memory_limit_exceeded(&self.entity);
485                    self.state_atomic_set_failed();
486                    return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
487                        limit,
488                        estimated_memory,
489                    })));
490                }
491            } else if estimated_memory > limit {
492                // Hard limit (no soft limits configured)
493                crate::metrics::counters::memory_limit_exceeded(&self.entity);
494                self.state_atomic_set_failed();
495                return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
496                    limit,
497                    estimated_memory,
498                })));
499            }
500
501            // Note: Warn threshold would be handled by instrumentation/logging layer
502            // This is for application-level monitoring, not a hard error
503        }
504
505        match self.receiver.poll_recv(cx) {
506            Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
507            Poll::Ready(Some(Err(e))) => {
508                // Stream encountered an error
509                self.state_atomic_set_failed();
510                Poll::Ready(Some(Err(e)))
511            }
512            Poll::Ready(None) => {
513                // Stream completed normally
514                self.state_atomic_set_completed();
515                Poll::Ready(None)
516            }
517            Poll::Pending => Poll::Pending,
518        }
519    }
520}
521
522/// Extract JSON bytes from `DataRow` message
523///
524/// # Errors
525///
526/// Returns [`WireError::Protocol`] if the message is not a `DataRow`, the row does not
527/// have exactly one field, or the field value is null.
528pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
529    match msg {
530        BackendMessage::DataRow(fields) => match fields.as_slice() {
531            [only] => only
532                .clone()
533                .ok_or_else(|| WireError::Protocol("null data field".into())),
534            _ => Err(WireError::Protocol(format!(
535                "expected 1 field, got {}",
536                fields.len()
537            ))),
538        },
539        _ => Err(WireError::Protocol("expected DataRow".into())),
540    }
541}
542
543/// Parse JSON bytes into Value
544///
545/// # Errors
546///
547/// Returns [`WireError`] if the bytes are not valid JSON.
548pub fn parse_json(data: Bytes) -> Result<Value> {
549    let value: Value = serde_json::from_slice(&data)?;
550    Ok(value)
551}
552
553#[cfg(test)]
554mod tests;