Skip to main content

fraiseql_wire/stream/
json_stream.rs

1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19const STATE_COMPLETED: u8 = 2;
20const STATE_FAILED: u8 = 3;
21
22/// Stream state machine
23///
24/// Tracks the current state of the JSON stream.
25/// Streams start in Running state and can transition to Paused
26/// or terminal states (Completed, Failed).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum StreamState {
29    /// Background task is actively reading from Postgres
30    Running,
31    /// Background task is paused (suspended, connection alive)
32    Paused,
33    /// Query completed normally
34    Completed,
35    /// Query failed with error
36    Failed,
37}
38
39/// Stream statistics snapshot
40///
41/// Provides a read-only view of current stream state without consuming items.
42/// All values are point-in-time measurements.
43#[derive(Debug, Clone)]
44pub struct StreamStats {
45    /// Number of items currently buffered in channel (0-256)
46    pub items_buffered: usize,
47    /// Estimated memory used by buffered items in bytes
48    pub estimated_memory: usize,
49    /// Total rows yielded to consumer so far
50    pub total_rows_yielded: u64,
51    /// Total rows filtered out by Rust predicates
52    pub total_rows_filtered: u64,
53}
54
55impl StreamStats {
56    /// Create zero-valued stats
57    ///
58    /// Useful for testing and initialization.
59    pub fn zero() -> Self {
60        Self {
61            items_buffered: 0,
62            estimated_memory: 0,
63            total_rows_yielded: 0,
64            total_rows_filtered: 0,
65        }
66    }
67}
68
69/// JSON value stream
70pub struct JsonStream {
71    receiver: mpsc::Receiver<Result<Value>>,
72    _cancel_tx: mpsc::Sender<()>,  // Dropped when stream is dropped
73    entity: String,                // Entity name for metrics
74    rows_yielded: Arc<AtomicU64>,  // Counter of items yielded to consumer
75    rows_filtered: Arc<AtomicU64>, // Counter of items filtered
76    max_memory: Option<usize>,     // Optional memory limit in bytes
77    soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
78
79    // Lightweight state tracking (cheap AtomicU8)
80    // Used for fast state checks on all queries
81    // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
82    state_atomic: Arc<AtomicU8>,
83
84    // Pause/resume state machine (lazily initialized)
85    // Only allocated if pause() is called
86    pause_resume: Option<PauseResumeState>,
87
88    // Sampling counter for metrics recording (sample 1 in N polls)
89    poll_count: AtomicU64, // Counter for sampling metrics
90}
91
92/// Pause/resume state (lazily allocated)
93/// Only created when pause() is first called
94pub struct PauseResumeState {
95    state: Arc<Mutex<StreamState>>,     // Current stream state
96    pause_signal: Arc<Notify>,          // Signal to pause background task
97    resume_signal: Arc<Notify>,         // Signal to resume background task
98    paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
99    pause_timeout: Option<Duration>,    // Optional auto-resume timeout
100}
101
102impl JsonStream {
103    /// Create new JSON stream
104    pub(crate) fn new(
105        receiver: mpsc::Receiver<Result<Value>>,
106        cancel_tx: mpsc::Sender<()>,
107        entity: String,
108        max_memory: Option<usize>,
109        _soft_limit_warn_threshold: Option<f32>,
110        soft_limit_fail_threshold: Option<f32>,
111    ) -> Self {
112        Self {
113            receiver,
114            _cancel_tx: cancel_tx,
115            entity,
116            rows_yielded: Arc::new(AtomicU64::new(0)),
117            rows_filtered: Arc::new(AtomicU64::new(0)),
118            max_memory,
119            soft_limit_fail_threshold,
120
121            // Initialize lightweight atomic state
122            state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
123
124            // Pause/resume lazily initialized (only if pause() is called)
125            pause_resume: None,
126
127            // Initialize sampling counter
128            poll_count: AtomicU64::new(0),
129        }
130    }
131
132    /// Initialize pause/resume state (called on first pause())
133    fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
134        if self.pause_resume.is_none() {
135            self.pause_resume = Some(PauseResumeState {
136                state: Arc::new(Mutex::new(StreamState::Running)),
137                pause_signal: Arc::new(Notify::new()),
138                resume_signal: Arc::new(Notify::new()),
139                paused_occupancy: Arc::new(AtomicUsize::new(0)),
140                pause_timeout: None,
141            });
142        }
143        self.pause_resume.as_mut().unwrap()
144    }
145
146    /// Get current stream state
147    ///
148    /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
149    /// This is a synchronous getter that doesn't require awaiting.
150    ///
151    /// Note: This is a best-effort snapshot that may return slightly stale state
152    /// due to the non-blocking nature of atomic reads.
153    pub fn state_snapshot(&self) -> StreamState {
154        // Read from lightweight atomic state (fast path, no locks)
155        match self.state_atomic.load(Ordering::Acquire) {
156            STATE_RUNNING => StreamState::Running,
157            STATE_PAUSED => StreamState::Paused,
158            STATE_COMPLETED => StreamState::Completed,
159            STATE_FAILED => StreamState::Failed,
160            _ => {
161                // Unknown state - fall back to checking if channel is closed
162                if self.receiver.is_closed() {
163                    StreamState::Completed
164                } else {
165                    StreamState::Running
166                }
167            }
168        }
169    }
170
171    /// Get buffered rows when paused
172    ///
173    /// Returns the number of rows buffered in the channel when the stream was paused.
174    /// Only meaningful when stream is in Paused state.
175    pub fn paused_occupancy(&self) -> usize {
176        self.pause_resume
177            .as_ref()
178            .map(|pr| pr.paused_occupancy.load(Ordering::Relaxed))
179            .unwrap_or(0)
180    }
181
182    /// Set timeout for pause (auto-resume after duration)
183    ///
184    /// When a stream is paused, the background task will automatically resume
185    /// after the specified duration expires, even if resume() is not called.
186    ///
187    /// # Arguments
188    ///
189    /// * `duration` - How long to stay paused before auto-resuming
190    ///
191    /// # Examples
192    ///
193    /// ```ignore
194    /// let mut stream = client.query::<T>("entity").execute().await?;
195    /// stream.set_pause_timeout(Duration::from_secs(5));
196    /// stream.pause().await?;  // Will auto-resume after 5 seconds
197    /// ```
198    pub fn set_pause_timeout(&mut self, duration: Duration) {
199        self.ensure_pause_resume().pause_timeout = Some(duration);
200        tracing::debug!("pause timeout set to {:?}", duration);
201    }
202
203    /// Clear pause timeout (no auto-resume)
204    pub fn clear_pause_timeout(&mut self) {
205        if let Some(ref mut pr) = self.pause_resume {
206            pr.pause_timeout = None;
207            tracing::debug!("pause timeout cleared");
208        }
209    }
210
211    /// Get current pause timeout (if set)
212    pub(crate) fn pause_timeout(&self) -> Option<Duration> {
213        self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
214    }
215
216    /// Pause the stream
217    ///
218    /// Suspends the background task from reading more data from Postgres.
219    /// The connection remains open and can be resumed later.
220    /// Buffered rows are preserved and can be consumed normally.
221    ///
222    /// This method is idempotent: calling pause() on an already-paused stream is a no-op.
223    ///
224    /// Returns an error if the stream has already completed or failed.
225    ///
226    /// # Example
227    ///
228    /// ```ignore
229    /// stream.pause().await?;
230    /// // Background task stops reading
231    /// // Consumer can still poll for remaining buffered items
232    /// ```
233    pub async fn pause(&mut self) -> Result<()> {
234        let entity = self.entity.clone();
235
236        // Update lightweight atomic state first (fast path)
237        self.state_atomic_set_paused();
238
239        let pr = self.ensure_pause_resume();
240        let mut state = pr.state.lock().await;
241
242        match *state {
243            StreamState::Running => {
244                // Signal background task to pause
245                pr.pause_signal.notify_one();
246                // Update state
247                *state = StreamState::Paused;
248
249                // Record metric
250                crate::metrics::counters::stream_paused(&entity);
251                Ok(())
252            }
253            StreamState::Paused => {
254                // Idempotent: already paused
255                Ok(())
256            }
257            StreamState::Completed | StreamState::Failed => {
258                // Cannot pause a terminal stream
259                Err(Error::Protocol(
260                    "cannot pause a completed or failed stream".to_string(),
261                ))
262            }
263        }
264    }
265
266    /// Resume the stream
267    ///
268    /// Resumes the background task to continue reading data from Postgres.
269    /// Only has an effect if the stream is currently paused.
270    ///
271    /// This method is idempotent: calling resume() before pause() or on an
272    /// already-running stream is a no-op.
273    ///
274    /// Returns an error if the stream has already completed or failed.
275    ///
276    /// # Example
277    ///
278    /// ```ignore
279    /// stream.resume().await?;
280    /// // Background task resumes reading
281    /// // Consumer can poll for more items
282    /// ```
283    pub async fn resume(&mut self) -> Result<()> {
284        // Update lightweight atomic state first (fast path)
285        // Check atomic state before borrowing pause_resume
286        let current = self.state_atomic_get();
287
288        // Resume only makes sense if pause/resume was initialized
289        if let Some(ref mut pr) = self.pause_resume {
290            let entity = self.entity.clone();
291
292            // Note: Set to RUNNING to reflect resumed state
293            if current == STATE_PAUSED {
294                // Only update atomic if currently paused
295                self.state_atomic.store(STATE_RUNNING, Ordering::Release);
296            }
297
298            let mut state = pr.state.lock().await;
299
300            match *state {
301                StreamState::Paused => {
302                    // Signal background task to resume
303                    pr.resume_signal.notify_one();
304                    // Update state
305                    *state = StreamState::Running;
306
307                    // Record metric
308                    crate::metrics::counters::stream_resumed(&entity);
309                    Ok(())
310                }
311                StreamState::Running => {
312                    // Idempotent: already running (or resume before pause)
313                    Ok(())
314                }
315                StreamState::Completed | StreamState::Failed => {
316                    // Cannot resume a terminal stream
317                    Err(Error::Protocol(
318                        "cannot resume a completed or failed stream".to_string(),
319                    ))
320                }
321            }
322        } else {
323            // No pause/resume infrastructure (never paused), idempotent no-op
324            Ok(())
325        }
326    }
327
328    /// Pause the stream with a diagnostic reason
329    ///
330    /// Like pause(), but logs the provided reason for diagnostic purposes.
331    /// This helps track why streams are being paused (e.g., "backpressure",
332    /// "maintenance", "rate limit").
333    ///
334    /// # Arguments
335    ///
336    /// * `reason` - Optional reason for pausing (logged at debug level)
337    ///
338    /// # Example
339    ///
340    /// ```ignore
341    /// stream.pause_with_reason("backpressure: consumer busy").await?;
342    /// ```
343    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
344        tracing::debug!("pausing stream: {}", reason);
345        self.pause().await
346    }
347
348    /// Clone internal state for passing to background task (only if pause/resume is initialized)
349    pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
350        self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
351    }
352
353    /// Clone pause signal for passing to background task (only if pause/resume is initialized)
354    pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
355        self.pause_resume
356            .as_ref()
357            .map(|pr| Arc::clone(&pr.pause_signal))
358    }
359
360    /// Clone resume signal for passing to background task (only if pause/resume is initialized)
361    pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
362        self.pause_resume
363            .as_ref()
364            .map(|pr| Arc::clone(&pr.resume_signal))
365    }
366
367    // =========================================================================
368    // Lightweight state machine methods
369    // =========================================================================
370
371    /// Clone atomic state for passing to background task
372    pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
373        Arc::clone(&self.state_atomic)
374    }
375
376    /// Get current state from atomic (fast path, no locks)
377    pub(crate) fn state_atomic_get(&self) -> u8 {
378        self.state_atomic.load(Ordering::Acquire)
379    }
380
381    /// Set state to paused using atomic
382    pub(crate) fn state_atomic_set_paused(&self) {
383        self.state_atomic.store(STATE_PAUSED, Ordering::Release);
384    }
385
386    /// Set state to completed using atomic
387    pub(crate) fn state_atomic_set_completed(&self) {
388        self.state_atomic.store(STATE_COMPLETED, Ordering::Release);
389    }
390
391    /// Set state to failed using atomic
392    pub(crate) fn state_atomic_set_failed(&self) {
393        self.state_atomic.store(STATE_FAILED, Ordering::Release);
394    }
395
396    /// Get current stream statistics
397    ///
398    /// Returns a snapshot of stream state without consuming any items.
399    /// This can be called at any time to monitor progress.
400    ///
401    /// # Example
402    ///
403    /// ```ignore
404    /// let stats = stream.stats();
405    /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
406    /// ```
407    pub fn stats(&self) -> StreamStats {
408        let items_buffered = self.receiver.len();
409        let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
410        let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
411        let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
412
413        StreamStats {
414            items_buffered,
415            estimated_memory,
416            total_rows_yielded,
417            total_rows_filtered,
418        }
419    }
420
421    /// Increment rows yielded counter (called from FilteredStream)
422    #[allow(unused)]
423    pub(crate) fn increment_rows_yielded(&self, count: u64) {
424        self.rows_yielded.fetch_add(count, Ordering::Relaxed);
425    }
426
427    /// Increment rows filtered counter (called from FilteredStream)
428    #[allow(unused)]
429    pub(crate) fn increment_rows_filtered(&self, count: u64) {
430        self.rows_filtered.fetch_add(count, Ordering::Relaxed);
431    }
432
433    /// Clone the yielded counter for passing to background task
434    #[allow(unused)]
435    pub(crate) fn clone_rows_yielded(&self) -> Arc<AtomicU64> {
436        Arc::clone(&self.rows_yielded)
437    }
438
439    /// Clone the filtered counter for passing to background task
440    #[allow(unused)]
441    pub(crate) fn clone_rows_filtered(&self) -> Arc<AtomicU64> {
442        Arc::clone(&self.rows_filtered)
443    }
444}
445
446impl Stream for JsonStream {
447    type Item = Result<Value>;
448
449    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
450        // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
451        // For 100K rows, this records ~100 times instead of 100K times
452        let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
453        if poll_idx % 1000 == 0 {
454            let occupancy = self.receiver.len() as u64;
455            crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
456            crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
457        }
458
459        // Check memory limit BEFORE receiving (pre-enqueue strategy)
460        // This stops consuming when buffer reaches limit
461        if let Some(limit) = self.max_memory {
462            let items_buffered = self.receiver.len();
463            let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
464
465            // Check soft limit thresholds first (warn before fail)
466            if let Some(fail_threshold) = self.soft_limit_fail_threshold {
467                let threshold_bytes = (limit as f32 * fail_threshold) as usize;
468                if estimated_memory > threshold_bytes {
469                    // Record metric for memory limit exceeded
470                    crate::metrics::counters::memory_limit_exceeded(&self.entity);
471                    self.state_atomic_set_failed();
472                    return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
473                        limit,
474                        estimated_memory,
475                    })));
476                }
477            } else if estimated_memory > limit {
478                // Hard limit (no soft limits configured)
479                crate::metrics::counters::memory_limit_exceeded(&self.entity);
480                self.state_atomic_set_failed();
481                return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
482                    limit,
483                    estimated_memory,
484                })));
485            }
486
487            // Note: Warn threshold would be handled by instrumentation/logging layer
488            // This is for application-level monitoring, not a hard error
489        }
490
491        match self.receiver.poll_recv(cx) {
492            Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
493            Poll::Ready(Some(Err(e))) => {
494                // Stream encountered an error
495                self.state_atomic_set_failed();
496                Poll::Ready(Some(Err(e)))
497            }
498            Poll::Ready(None) => {
499                // Stream completed normally
500                self.state_atomic_set_completed();
501                Poll::Ready(None)
502            }
503            Poll::Pending => Poll::Pending,
504        }
505    }
506}
507
508/// Extract JSON bytes from DataRow message
509pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
510    match msg {
511        BackendMessage::DataRow(fields) => {
512            if fields.len() != 1 {
513                return Err(Error::Protocol(format!(
514                    "expected 1 field, got {}",
515                    fields.len()
516                )));
517            }
518
519            let field = &fields[0];
520            field
521                .clone()
522                .ok_or_else(|| Error::Protocol("null data field".into()))
523        }
524        _ => Err(Error::Protocol("expected DataRow".into())),
525    }
526}
527
528/// Parse JSON bytes into Value
529pub fn parse_json(data: Bytes) -> Result<Value> {
530    let value: Value = serde_json::from_slice(&data)?;
531    Ok(value)
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    #[test]
539    fn test_extract_json_bytes() {
540        let data = Bytes::from_static(b"{\"key\":\"value\"}");
541        let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
542
543        let extracted = extract_json_bytes(&msg).unwrap();
544        assert_eq!(extracted, data);
545    }
546
547    #[test]
548    fn test_extract_null_field() {
549        let msg = BackendMessage::DataRow(vec![None]);
550        assert!(extract_json_bytes(&msg).is_err());
551    }
552
553    #[test]
554    fn test_parse_json() {
555        let data = Bytes::from_static(b"{\"key\":\"value\"}");
556        let value = parse_json(data).unwrap();
557
558        assert_eq!(value["key"], "value");
559    }
560
561    #[test]
562    fn test_parse_invalid_json() {
563        let data = Bytes::from_static(b"not json");
564        assert!(parse_json(data).is_err());
565    }
566
567    #[test]
568    fn test_stream_stats_creation() {
569        let stats = StreamStats::zero();
570        assert_eq!(stats.items_buffered, 0);
571        assert_eq!(stats.estimated_memory, 0);
572        assert_eq!(stats.total_rows_yielded, 0);
573        assert_eq!(stats.total_rows_filtered, 0);
574    }
575
576    #[test]
577    fn test_stream_stats_memory_estimation() {
578        let stats = StreamStats {
579            items_buffered: 100,
580            estimated_memory: 100 * 2048,
581            total_rows_yielded: 100,
582            total_rows_filtered: 10,
583        };
584
585        // 100 items * 2KB per item = 200KB
586        assert_eq!(stats.estimated_memory, 204800);
587    }
588
589    #[test]
590    fn test_stream_stats_clone() {
591        let stats = StreamStats {
592            items_buffered: 50,
593            estimated_memory: 100000,
594            total_rows_yielded: 500,
595            total_rows_filtered: 50,
596        };
597
598        let cloned = stats.clone();
599        assert_eq!(cloned.items_buffered, stats.items_buffered);
600        assert_eq!(cloned.estimated_memory, stats.estimated_memory);
601        assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
602        assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
603    }
604
605    #[test]
606    fn test_stream_state_constants() {
607        // Verify state constants are distinct
608        assert_ne!(STATE_RUNNING, STATE_PAUSED);
609        assert_ne!(STATE_RUNNING, STATE_COMPLETED);
610        assert_ne!(STATE_RUNNING, STATE_FAILED);
611        assert_ne!(STATE_PAUSED, STATE_COMPLETED);
612        assert_ne!(STATE_PAUSED, STATE_FAILED);
613        assert_ne!(STATE_COMPLETED, STATE_FAILED);
614    }
615
616    #[test]
617    fn test_stream_state_enum_equality() {
618        assert_eq!(StreamState::Running, StreamState::Running);
619        assert_eq!(StreamState::Paused, StreamState::Paused);
620        assert_eq!(StreamState::Completed, StreamState::Completed);
621        assert_eq!(StreamState::Failed, StreamState::Failed);
622        assert_ne!(StreamState::Running, StreamState::Paused);
623    }
624}