Skip to main content

fraiseql_wire/stream/
json_stream.rs

1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Result, WireError};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19const STATE_COMPLETED: u8 = 2;
20const STATE_FAILED: u8 = 3;
21
22/// Stream state machine
23///
24/// Tracks the current state of the JSON stream.
25/// Streams start in Running state and can transition to Paused
26/// or terminal states (Completed, Failed).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum StreamState {
30    /// Background task is actively reading from Postgres
31    Running,
32    /// Background task is paused (suspended, connection alive)
33    Paused,
34    /// Query completed normally
35    Completed,
36    /// Query failed with error
37    Failed,
38}
39
40/// Stream statistics snapshot
41///
42/// Provides a read-only view of current stream state without consuming items.
43/// All values are point-in-time measurements.
44#[derive(Debug, Clone)]
45pub struct StreamStats {
46    /// Number of items currently buffered in channel (0-256)
47    pub items_buffered: usize,
48    /// Estimated memory used by buffered items in bytes
49    pub estimated_memory: usize,
50    /// Total rows yielded to consumer so far
51    pub total_rows_yielded: u64,
52    /// Total rows filtered out by Rust predicates
53    pub total_rows_filtered: u64,
54}
55
56impl StreamStats {
57    /// Create zero-valued stats
58    ///
59    /// Useful for testing and initialization.
60    pub const fn zero() -> Self {
61        Self {
62            items_buffered: 0,
63            estimated_memory: 0,
64            total_rows_yielded: 0,
65            total_rows_filtered: 0,
66        }
67    }
68}
69
70/// JSON value stream
71pub struct JsonStream {
72    receiver: mpsc::Receiver<Result<Value>>,
73    _cancel_tx: mpsc::Sender<()>,  // Dropped when stream is dropped
74    entity: String,                // Entity name for metrics
75    rows_yielded: Arc<AtomicU64>,  // Counter of items yielded to consumer
76    rows_filtered: Arc<AtomicU64>, // Counter of items filtered
77    max_memory: Option<usize>,     // Optional memory limit in bytes
78    soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
79
80    // Lightweight state tracking (cheap AtomicU8)
81    // Used for fast state checks on all queries
82    // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
83    state_atomic: Arc<AtomicU8>,
84
85    // Pause/resume state machine (lazily initialized)
86    // Only allocated if pause() is called
87    pause_resume: Option<PauseResumeState>,
88
89    // Sampling counter for metrics recording (sample 1 in N polls)
90    poll_count: AtomicU64, // Counter for sampling metrics
91}
92
93/// Pause/resume state (lazily allocated)
94/// Only created when `pause()` is first called
95pub struct PauseResumeState {
96    state: Arc<Mutex<StreamState>>,     // Current stream state
97    pause_signal: Arc<Notify>,          // Signal to pause background task
98    resume_signal: Arc<Notify>,         // Signal to resume background task
99    paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
100    pause_timeout: Option<Duration>,    // Optional auto-resume timeout
101}
102
103impl JsonStream {
104    /// Create new JSON stream
105    pub(crate) fn new(
106        receiver: mpsc::Receiver<Result<Value>>,
107        cancel_tx: mpsc::Sender<()>,
108        entity: String,
109        max_memory: Option<usize>,
110        _soft_limit_warn_threshold: Option<f32>,
111        soft_limit_fail_threshold: Option<f32>,
112    ) -> Self {
113        Self {
114            receiver,
115            _cancel_tx: cancel_tx,
116            entity,
117            rows_yielded: Arc::new(AtomicU64::new(0)),
118            rows_filtered: Arc::new(AtomicU64::new(0)),
119            max_memory,
120            soft_limit_fail_threshold,
121
122            // Initialize lightweight atomic state
123            state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
124
125            // Pause/resume lazily initialized (only if pause() is called)
126            pause_resume: None,
127
128            // Initialize sampling counter
129            poll_count: AtomicU64::new(0),
130        }
131    }
132
133    /// Initialize pause/resume state (called on first `pause()`)
134    fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
135        if self.pause_resume.is_none() {
136            self.pause_resume = Some(PauseResumeState {
137                state: Arc::new(Mutex::new(StreamState::Running)),
138                pause_signal: Arc::new(Notify::new()),
139                resume_signal: Arc::new(Notify::new()),
140                paused_occupancy: Arc::new(AtomicUsize::new(0)),
141                pause_timeout: None,
142            });
143        }
144        self.pause_resume
145            .as_mut()
146            .expect("pause_resume initialized in the block above")
147    }
148
149    /// Get current stream state
150    ///
151    /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
152    /// This is a synchronous getter that doesn't require awaiting.
153    ///
154    /// Note: This is a best-effort snapshot that may return slightly stale state
155    /// due to the non-blocking nature of atomic reads.
156    pub fn state_snapshot(&self) -> StreamState {
157        // Read from lightweight atomic state (fast path, no locks)
158        match self.state_atomic.load(Ordering::Acquire) {
159            STATE_RUNNING => StreamState::Running,
160            STATE_PAUSED => StreamState::Paused,
161            STATE_COMPLETED => StreamState::Completed,
162            STATE_FAILED => StreamState::Failed,
163            _ => {
164                // Unknown state - fall back to checking if channel is closed
165                if self.receiver.is_closed() {
166                    StreamState::Completed
167                } else {
168                    StreamState::Running
169                }
170            }
171        }
172    }
173
174    /// Get buffered rows when paused
175    ///
176    /// Returns the number of rows buffered in the channel when the stream was paused.
177    /// Only meaningful when stream is in Paused state.
178    pub fn paused_occupancy(&self) -> usize {
179        self.pause_resume
180            .as_ref()
181            .map_or(0, |pr| pr.paused_occupancy.load(Ordering::Relaxed))
182    }
183
184    /// Set timeout for pause (auto-resume after duration)
185    ///
186    /// When a stream is paused, the background task will automatically resume
187    /// after the specified duration expires, even if `resume()` is not called.
188    ///
189    /// # Arguments
190    ///
191    /// * `duration` - How long to stay paused before auto-resuming
192    ///
193    /// # Examples
194    ///
195    /// ```text
196    /// // Requires: a JsonStream instance from execute_query.
197    /// // Note: set_pause_timeout is on JsonStream, not QueryStream.
198    /// // Use FraiseClient::execute_query (internal) to obtain a JsonStream directly.
199    /// use std::time::Duration;
200    /// stream.set_pause_timeout(Duration::from_secs(5));
201    /// stream.pause().await?;  // Will auto-resume after 5 seconds
202    /// ```
203    pub fn set_pause_timeout(&mut self, duration: Duration) {
204        self.ensure_pause_resume().pause_timeout = Some(duration);
205        tracing::debug!("pause timeout set to {:?}", duration);
206    }
207
208    /// Clear pause timeout (no auto-resume)
209    pub fn clear_pause_timeout(&mut self) {
210        if let Some(ref mut pr) = self.pause_resume {
211            pr.pause_timeout = None;
212            tracing::debug!("pause timeout cleared");
213        }
214    }
215
216    /// Get current pause timeout (if set)
217    pub(crate) fn pause_timeout(&self) -> Option<Duration> {
218        self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
219    }
220
221    /// Pause the stream
222    ///
223    /// Suspends the background task from reading more data from Postgres.
224    /// The connection remains open and can be resumed later.
225    /// Buffered rows are preserved and can be consumed normally.
226    ///
227    /// This method is idempotent: calling `pause()` on an already-paused stream is a no-op.
228    ///
229    /// Returns an error if the stream has already completed or failed.
230    ///
231    /// # Errors
232    ///
233    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
234    ///
235    /// # Example
236    ///
237    /// ```no_run
238    /// // Requires: live Postgres streaming connection.
239    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
240    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
241    /// stream.pause().await?;
242    /// // Background task stops reading
243    /// // Consumer can still poll for remaining buffered items
244    /// # Ok(())
245    /// # }
246    /// ```
247    pub async fn pause(&mut self) -> Result<()> {
248        let entity = self.entity.clone();
249
250        // Update lightweight atomic state first (fast path)
251        self.state_atomic_set_paused();
252
253        let pr = self.ensure_pause_resume();
254        let mut state = pr.state.lock().await;
255
256        match *state {
257            StreamState::Running => {
258                // Signal background task to pause
259                pr.pause_signal.notify_one();
260                // Update state
261                *state = StreamState::Paused;
262
263                // Record metric
264                crate::metrics::counters::stream_paused(&entity);
265                Ok(())
266            }
267            StreamState::Paused => {
268                // Idempotent: already paused
269                Ok(())
270            }
271            StreamState::Completed | StreamState::Failed => {
272                // Cannot pause a terminal stream
273                Err(WireError::Protocol(
274                    "cannot pause a completed or failed stream".to_string(),
275                ))
276            }
277        }
278    }
279
280    /// Resume the stream
281    ///
282    /// Resumes the background task to continue reading data from Postgres.
283    /// Only has an effect if the stream is currently paused.
284    ///
285    /// This method is idempotent: calling `resume()` before `pause()` or on an
286    /// already-running stream is a no-op.
287    ///
288    /// Returns an error if the stream has already completed or failed.
289    ///
290    /// # Errors
291    ///
292    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
293    ///
294    /// # Example
295    ///
296    /// ```no_run
297    /// // Requires: live Postgres streaming connection.
298    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
299    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
300    /// stream.resume().await?;
301    /// // Background task resumes reading
302    /// // Consumer can poll for more items
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub async fn resume(&mut self) -> Result<()> {
307        // Update lightweight atomic state first (fast path)
308        // Check atomic state before borrowing pause_resume
309        let current = self.state_atomic_get();
310
311        // Resume only makes sense if pause/resume was initialized
312        if let Some(ref mut pr) = self.pause_resume {
313            let entity = self.entity.clone();
314
315            // Note: Set to RUNNING to reflect resumed state
316            if current == STATE_PAUSED {
317                // Only update atomic if currently paused
318                self.state_atomic.store(STATE_RUNNING, Ordering::Release);
319            }
320
321            let mut state = pr.state.lock().await;
322
323            match *state {
324                StreamState::Paused => {
325                    // Signal background task to resume
326                    pr.resume_signal.notify_one();
327                    // Update state
328                    *state = StreamState::Running;
329
330                    // Record metric
331                    crate::metrics::counters::stream_resumed(&entity);
332                    Ok(())
333                }
334                StreamState::Running => {
335                    // Idempotent: already running (or resume before pause)
336                    Ok(())
337                }
338                StreamState::Completed | StreamState::Failed => {
339                    // Cannot resume a terminal stream
340                    Err(WireError::Protocol(
341                        "cannot resume a completed or failed stream".to_string(),
342                    ))
343                }
344            }
345        } else {
346            // No pause/resume infrastructure (never paused), idempotent no-op
347            Ok(())
348        }
349    }
350
351    /// Pause the stream with a diagnostic reason
352    ///
353    /// Like `pause()`, but logs the provided reason for diagnostic purposes.
354    /// This helps track why streams are being paused (e.g., "backpressure",
355    /// "maintenance", "rate limit").
356    ///
357    /// # Arguments
358    ///
359    /// * `reason` - Optional reason for pausing (logged at debug level)
360    ///
361    /// # Errors
362    ///
363    /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
364    ///
365    /// # Example
366    ///
367    /// ```no_run
368    /// // Requires: live Postgres streaming connection.
369    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
370    /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
371    /// stream.pause_with_reason("backpressure: consumer busy").await?;
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
376        tracing::debug!("pausing stream: {}", reason);
377        self.pause().await
378    }
379
380    /// Clone internal state for passing to background task (only if pause/resume is initialized)
381    pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
382        self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
383    }
384
385    /// Clone pause signal for passing to background task (only if pause/resume is initialized)
386    pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
387        self.pause_resume
388            .as_ref()
389            .map(|pr| Arc::clone(&pr.pause_signal))
390    }
391
392    /// Clone resume signal for passing to background task (only if pause/resume is initialized)
393    pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
394        self.pause_resume
395            .as_ref()
396            .map(|pr| Arc::clone(&pr.resume_signal))
397    }
398
399    // =========================================================================
400    // Lightweight state machine methods
401    // =========================================================================
402
403    /// Clone atomic state for passing to background task
404    pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
405        Arc::clone(&self.state_atomic)
406    }
407
408    /// Get current state from atomic (fast path, no locks)
409    pub(crate) fn state_atomic_get(&self) -> u8 {
410        self.state_atomic.load(Ordering::Acquire)
411    }
412
413    /// Set state to paused using atomic
414    pub(crate) fn state_atomic_set_paused(&self) {
415        self.state_atomic.store(STATE_PAUSED, Ordering::Release);
416    }
417
418    /// Set state to completed using atomic
419    pub(crate) fn state_atomic_set_completed(&self) {
420        self.state_atomic.store(STATE_COMPLETED, Ordering::Release);
421    }
422
423    /// Set state to failed using atomic
424    pub(crate) fn state_atomic_set_failed(&self) {
425        self.state_atomic.store(STATE_FAILED, Ordering::Release);
426    }
427
428    /// Get current stream statistics
429    ///
430    /// Returns a snapshot of stream state without consuming any items.
431    /// This can be called at any time to monitor progress.
432    ///
433    /// # Example
434    ///
435    /// ```no_run
436    /// // Requires: live Postgres streaming connection.
437    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
438    /// let stream = client.query::<serde_json::Value>("entity").execute().await?;
439    /// let stats = stream.stats();
440    /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
441    /// # Ok(())
442    /// # }
443    /// ```
444    pub fn stats(&self) -> StreamStats {
445        let items_buffered = self.receiver.len();
446        let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
447        let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
448        let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
449
450        StreamStats {
451            items_buffered,
452            estimated_memory,
453            total_rows_yielded,
454            total_rows_filtered,
455        }
456    }
457}
458
459impl Stream for JsonStream {
460    type Item = Result<Value>;
461
462    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
463        // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
464        // For 100K rows, this records ~100 times instead of 100K times
465        let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
466        if poll_idx.is_multiple_of(1000) {
467            let occupancy = self.receiver.len() as u64;
468            crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
469            crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
470        }
471
472        // Check memory limit BEFORE receiving (pre-enqueue strategy)
473        // This stops consuming when buffer reaches limit
474        if let Some(limit) = self.max_memory {
475            let items_buffered = self.receiver.len();
476            let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
477
478            // Check soft limit thresholds first (warn before fail)
479            if let Some(fail_threshold) = self.soft_limit_fail_threshold {
480                let threshold_bytes = (limit as f32 * fail_threshold) as usize;
481                if estimated_memory > threshold_bytes {
482                    // Record metric for memory limit exceeded
483                    crate::metrics::counters::memory_limit_exceeded(&self.entity);
484                    self.state_atomic_set_failed();
485                    return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
486                        limit,
487                        estimated_memory,
488                    })));
489                }
490            } else if estimated_memory > limit {
491                // Hard limit (no soft limits configured)
492                crate::metrics::counters::memory_limit_exceeded(&self.entity);
493                self.state_atomic_set_failed();
494                return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
495                    limit,
496                    estimated_memory,
497                })));
498            }
499
500            // Note: Warn threshold would be handled by instrumentation/logging layer
501            // This is for application-level monitoring, not a hard error
502        }
503
504        match self.receiver.poll_recv(cx) {
505            Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
506            Poll::Ready(Some(Err(e))) => {
507                // Stream encountered an error
508                self.state_atomic_set_failed();
509                Poll::Ready(Some(Err(e)))
510            }
511            Poll::Ready(None) => {
512                // Stream completed normally
513                self.state_atomic_set_completed();
514                Poll::Ready(None)
515            }
516            Poll::Pending => Poll::Pending,
517        }
518    }
519}
520
521/// Extract JSON bytes from `DataRow` message
522///
523/// # Errors
524///
525/// Returns [`WireError::Protocol`] if the message is not a `DataRow`, the row does not
526/// have exactly one field, or the field value is null.
527pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
528    match msg {
529        BackendMessage::DataRow(fields) => {
530            if fields.len() != 1 {
531                return Err(WireError::Protocol(format!(
532                    "expected 1 field, got {}",
533                    fields.len()
534                )));
535            }
536
537            let field = &fields[0];
538            field
539                .clone()
540                .ok_or_else(|| WireError::Protocol("null data field".into()))
541        }
542        _ => Err(WireError::Protocol("expected DataRow".into())),
543    }
544}
545
546/// Parse JSON bytes into Value
547///
548/// # Errors
549///
550/// Returns [`WireError`] if the bytes are not valid JSON.
551pub fn parse_json(data: Bytes) -> Result<Value> {
552    let value: Value = serde_json::from_slice(&data)?;
553    Ok(value)
554}
555
556#[cfg(test)]
557mod tests {
558    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
559    use super::*;
560
561    #[test]
562    fn test_extract_json_bytes() {
563        let data = Bytes::from_static(b"{\"key\":\"value\"}");
564        let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
565
566        let extracted = extract_json_bytes(&msg).unwrap();
567        assert_eq!(extracted, data);
568    }
569
570    #[test]
571    fn test_extract_null_field() {
572        let msg = BackendMessage::DataRow(vec![None]);
573        let result = extract_json_bytes(&msg);
574        assert!(
575            matches!(result, Err(WireError::Protocol(_))),
576            "expected Protocol error for null field, got: {result:?}"
577        );
578    }
579
580    #[test]
581    fn test_parse_json() {
582        let data = Bytes::from_static(b"{\"key\":\"value\"}");
583        let value = parse_json(data).unwrap();
584
585        assert_eq!(value["key"], "value");
586    }
587
588    #[test]
589    fn test_parse_invalid_json() {
590        let data = Bytes::from_static(b"not json");
591        let result = parse_json(data);
592        assert!(
593            matches!(result, Err(WireError::JsonDecode(_))),
594            "expected JsonDecode error for invalid JSON, got: {result:?}"
595        );
596    }
597
598    #[test]
599    fn test_stream_stats_creation() {
600        let stats = StreamStats::zero();
601        assert_eq!(stats.items_buffered, 0);
602        assert_eq!(stats.estimated_memory, 0);
603        assert_eq!(stats.total_rows_yielded, 0);
604        assert_eq!(stats.total_rows_filtered, 0);
605    }
606
607    #[test]
608    fn test_stream_stats_memory_estimation() {
609        let stats = StreamStats {
610            items_buffered: 100,
611            estimated_memory: 100 * 2048,
612            total_rows_yielded: 100,
613            total_rows_filtered: 10,
614        };
615
616        // 100 items * 2KB per item = 200KB
617        assert_eq!(stats.estimated_memory, 204800);
618    }
619
620    #[test]
621    fn test_stream_stats_clone() {
622        let stats = StreamStats {
623            items_buffered: 50,
624            estimated_memory: 100000,
625            total_rows_yielded: 500,
626            total_rows_filtered: 50,
627        };
628
629        let cloned = stats.clone();
630        assert_eq!(cloned.items_buffered, stats.items_buffered);
631        assert_eq!(cloned.estimated_memory, stats.estimated_memory);
632        assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
633        assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
634    }
635
636    #[test]
637    fn test_stream_state_constants() {
638        // Verify state constants are distinct
639        assert_ne!(STATE_RUNNING, STATE_PAUSED);
640        assert_ne!(STATE_RUNNING, STATE_COMPLETED);
641        assert_ne!(STATE_RUNNING, STATE_FAILED);
642        assert_ne!(STATE_PAUSED, STATE_COMPLETED);
643        assert_ne!(STATE_PAUSED, STATE_FAILED);
644        assert_ne!(STATE_COMPLETED, STATE_FAILED);
645    }
646
647    #[test]
648    fn test_stream_state_enum_equality() {
649        assert_eq!(StreamState::Running, StreamState::Running);
650        assert_eq!(StreamState::Paused, StreamState::Paused);
651        assert_eq!(StreamState::Completed, StreamState::Completed);
652        assert_eq!(StreamState::Failed, StreamState::Failed);
653        assert_ne!(StreamState::Running, StreamState::Paused);
654    }
655}