fraiseql_wire/stream/json_stream.rs
1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Result, WireError};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19const STATE_COMPLETED: u8 = 2;
20const STATE_FAILED: u8 = 3;
21
22/// Stream state machine
23///
24/// Tracks the current state of the JSON stream.
25/// Streams start in Running state and can transition to Paused
26/// or terminal states (Completed, Failed).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum StreamState {
30 /// Background task is actively reading from Postgres
31 Running,
32 /// Background task is paused (suspended, connection alive)
33 Paused,
34 /// Query completed normally
35 Completed,
36 /// Query failed with error
37 Failed,
38}
39
40/// Stream statistics snapshot
41///
42/// Provides a read-only view of current stream state without consuming items.
43/// All values are point-in-time measurements.
44#[derive(Debug, Clone)]
45pub struct StreamStats {
46 /// Number of items currently buffered in channel (0-256)
47 pub items_buffered: usize,
48 /// Estimated memory used by buffered items in bytes
49 pub estimated_memory: usize,
50 /// Total rows yielded to consumer so far
51 pub total_rows_yielded: u64,
52 /// Total rows filtered out by Rust predicates
53 pub total_rows_filtered: u64,
54}
55
56impl StreamStats {
57 /// Create zero-valued stats
58 ///
59 /// Useful for testing and initialization.
60 pub const fn zero() -> Self {
61 Self {
62 items_buffered: 0,
63 estimated_memory: 0,
64 total_rows_yielded: 0,
65 total_rows_filtered: 0,
66 }
67 }
68}
69
70/// JSON value stream
71pub struct JsonStream {
72 receiver: mpsc::Receiver<Result<Value>>,
73 _cancel_tx: mpsc::Sender<()>, // Dropped when stream is dropped
74 entity: String, // Entity name for metrics
75 rows_yielded: Arc<AtomicU64>, // Counter of items yielded to consumer
76 rows_filtered: Arc<AtomicU64>, // Counter of items filtered
77 max_memory: Option<usize>, // Optional memory limit in bytes
78 soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
79
80 // Lightweight state tracking (cheap AtomicU8)
81 // Used for fast state checks on all queries
82 // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
83 state_atomic: Arc<AtomicU8>,
84
85 // Pause/resume state machine (lazily initialized)
86 // Only allocated if pause() is called
87 pause_resume: Option<PauseResumeState>,
88
89 // Sampling counter for metrics recording (sample 1 in N polls)
90 poll_count: AtomicU64, // Counter for sampling metrics
91}
92
93/// Pause/resume state (lazily allocated)
94/// Only created when `pause()` is first called
95pub struct PauseResumeState {
96 state: Arc<Mutex<StreamState>>, // Current stream state
97 pause_signal: Arc<Notify>, // Signal to pause background task
98 resume_signal: Arc<Notify>, // Signal to resume background task
99 paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
100 pause_timeout: Option<Duration>, // Optional auto-resume timeout
101}
102
103impl JsonStream {
104 /// Create new JSON stream
105 pub(crate) fn new(
106 receiver: mpsc::Receiver<Result<Value>>,
107 cancel_tx: mpsc::Sender<()>,
108 entity: String,
109 max_memory: Option<usize>,
110 _soft_limit_warn_threshold: Option<f32>,
111 soft_limit_fail_threshold: Option<f32>,
112 ) -> Self {
113 Self {
114 receiver,
115 _cancel_tx: cancel_tx,
116 entity,
117 rows_yielded: Arc::new(AtomicU64::new(0)),
118 rows_filtered: Arc::new(AtomicU64::new(0)),
119 max_memory,
120 soft_limit_fail_threshold,
121
122 // Initialize lightweight atomic state
123 state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
124
125 // Pause/resume lazily initialized (only if pause() is called)
126 pause_resume: None,
127
128 // Initialize sampling counter
129 poll_count: AtomicU64::new(0),
130 }
131 }
132
133 /// Initialize pause/resume state (called on first `pause()`)
134 fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
135 if self.pause_resume.is_none() {
136 self.pause_resume = Some(PauseResumeState {
137 state: Arc::new(Mutex::new(StreamState::Running)),
138 pause_signal: Arc::new(Notify::new()),
139 resume_signal: Arc::new(Notify::new()),
140 paused_occupancy: Arc::new(AtomicUsize::new(0)),
141 pause_timeout: None,
142 });
143 }
144 self.pause_resume
145 .as_mut()
146 .expect("pause_resume initialized in the block above")
147 }
148
149 /// Get current stream state
150 ///
151 /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
152 /// This is a synchronous getter that doesn't require awaiting.
153 ///
154 /// Note: This is a best-effort snapshot that may return slightly stale state
155 /// due to the non-blocking nature of atomic reads.
156 pub fn state_snapshot(&self) -> StreamState {
157 // Read from lightweight atomic state (fast path, no locks)
158 match self.state_atomic.load(Ordering::Acquire) {
159 STATE_RUNNING => StreamState::Running,
160 STATE_PAUSED => StreamState::Paused,
161 STATE_COMPLETED => StreamState::Completed,
162 STATE_FAILED => StreamState::Failed,
163 _ => {
164 // Unknown state - fall back to checking if channel is closed
165 if self.receiver.is_closed() {
166 StreamState::Completed
167 } else {
168 StreamState::Running
169 }
170 }
171 }
172 }
173
174 /// Get buffered rows when paused
175 ///
176 /// Returns the number of rows buffered in the channel when the stream was paused.
177 /// Only meaningful when stream is in Paused state.
178 pub fn paused_occupancy(&self) -> usize {
179 self.pause_resume
180 .as_ref()
181 .map_or(0, |pr| pr.paused_occupancy.load(Ordering::Relaxed))
182 }
183
184 /// Set timeout for pause (auto-resume after duration)
185 ///
186 /// When a stream is paused, the background task will automatically resume
187 /// after the specified duration expires, even if `resume()` is not called.
188 ///
189 /// # Arguments
190 ///
191 /// * `duration` - How long to stay paused before auto-resuming
192 ///
193 /// # Examples
194 ///
195 /// ```text
196 /// // Requires: a JsonStream instance from execute_query.
197 /// // Note: set_pause_timeout is on JsonStream, not QueryStream.
198 /// // Use FraiseClient::execute_query (internal) to obtain a JsonStream directly.
199 /// use std::time::Duration;
200 /// stream.set_pause_timeout(Duration::from_secs(5));
201 /// stream.pause().await?; // Will auto-resume after 5 seconds
202 /// ```
203 pub fn set_pause_timeout(&mut self, duration: Duration) {
204 self.ensure_pause_resume().pause_timeout = Some(duration);
205 tracing::debug!("pause timeout set to {:?}", duration);
206 }
207
208 /// Clear pause timeout (no auto-resume)
209 pub fn clear_pause_timeout(&mut self) {
210 if let Some(ref mut pr) = self.pause_resume {
211 pr.pause_timeout = None;
212 tracing::debug!("pause timeout cleared");
213 }
214 }
215
216 /// Get current pause timeout (if set)
217 pub(crate) fn pause_timeout(&self) -> Option<Duration> {
218 self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
219 }
220
221 /// Pause the stream
222 ///
223 /// Suspends the background task from reading more data from Postgres.
224 /// The connection remains open and can be resumed later.
225 /// Buffered rows are preserved and can be consumed normally.
226 ///
227 /// This method is idempotent: calling `pause()` on an already-paused stream is a no-op.
228 ///
229 /// Returns an error if the stream has already completed or failed.
230 ///
231 /// # Errors
232 ///
233 /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
234 ///
235 /// # Example
236 ///
237 /// ```no_run
238 /// // Requires: live Postgres streaming connection.
239 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
240 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
241 /// stream.pause().await?;
242 /// // Background task stops reading
243 /// // Consumer can still poll for remaining buffered items
244 /// # Ok(())
245 /// # }
246 /// ```
247 pub async fn pause(&mut self) -> Result<()> {
248 let entity = self.entity.clone();
249
250 // Update lightweight atomic state first (fast path)
251 self.state_atomic_set_paused();
252
253 let pr = self.ensure_pause_resume();
254 let mut state = pr.state.lock().await;
255
256 match *state {
257 StreamState::Running => {
258 // Signal background task to pause
259 pr.pause_signal.notify_one();
260 // Update state
261 *state = StreamState::Paused;
262
263 // Record metric
264 crate::metrics::counters::stream_paused(&entity);
265 Ok(())
266 }
267 StreamState::Paused => {
268 // Idempotent: already paused
269 Ok(())
270 }
271 StreamState::Completed | StreamState::Failed => {
272 // Cannot pause a terminal stream
273 Err(WireError::Protocol(
274 "cannot pause a completed or failed stream".to_string(),
275 ))
276 }
277 }
278 }
279
280 /// Resume the stream
281 ///
282 /// Resumes the background task to continue reading data from Postgres.
283 /// Only has an effect if the stream is currently paused.
284 ///
285 /// This method is idempotent: calling `resume()` before `pause()` or on an
286 /// already-running stream is a no-op.
287 ///
288 /// Returns an error if the stream has already completed or failed.
289 ///
290 /// # Errors
291 ///
292 /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
293 ///
294 /// # Example
295 ///
296 /// ```no_run
297 /// // Requires: live Postgres streaming connection.
298 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
299 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
300 /// stream.resume().await?;
301 /// // Background task resumes reading
302 /// // Consumer can poll for more items
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub async fn resume(&mut self) -> Result<()> {
307 // Update lightweight atomic state first (fast path)
308 // Check atomic state before borrowing pause_resume
309 let current = self.state_atomic_get();
310
311 // Resume only makes sense if pause/resume was initialized
312 if let Some(ref mut pr) = self.pause_resume {
313 let entity = self.entity.clone();
314
315 // Note: Set to RUNNING to reflect resumed state
316 if current == STATE_PAUSED {
317 // Only update atomic if currently paused
318 self.state_atomic.store(STATE_RUNNING, Ordering::Release);
319 }
320
321 let mut state = pr.state.lock().await;
322
323 match *state {
324 StreamState::Paused => {
325 // Signal background task to resume
326 pr.resume_signal.notify_one();
327 // Update state
328 *state = StreamState::Running;
329
330 // Record metric
331 crate::metrics::counters::stream_resumed(&entity);
332 Ok(())
333 }
334 StreamState::Running => {
335 // Idempotent: already running (or resume before pause)
336 Ok(())
337 }
338 StreamState::Completed | StreamState::Failed => {
339 // Cannot resume a terminal stream
340 Err(WireError::Protocol(
341 "cannot resume a completed or failed stream".to_string(),
342 ))
343 }
344 }
345 } else {
346 // No pause/resume infrastructure (never paused), idempotent no-op
347 Ok(())
348 }
349 }
350
351 /// Pause the stream with a diagnostic reason
352 ///
353 /// Like `pause()`, but logs the provided reason for diagnostic purposes.
354 /// This helps track why streams are being paused (e.g., "backpressure",
355 /// "maintenance", "rate limit").
356 ///
357 /// # Arguments
358 ///
359 /// * `reason` - Optional reason for pausing (logged at debug level)
360 ///
361 /// # Errors
362 ///
363 /// Returns [`WireError::Protocol`] if the stream is in a terminal state (completed or failed).
364 ///
365 /// # Example
366 ///
367 /// ```no_run
368 /// // Requires: live Postgres streaming connection.
369 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
370 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
371 /// stream.pause_with_reason("backpressure: consumer busy").await?;
372 /// # Ok(())
373 /// # }
374 /// ```
375 pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
376 tracing::debug!("pausing stream: {}", reason);
377 self.pause().await
378 }
379
380 /// Clone internal state for passing to background task (only if pause/resume is initialized)
381 pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
382 self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
383 }
384
385 /// Clone pause signal for passing to background task (only if pause/resume is initialized)
386 pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
387 self.pause_resume
388 .as_ref()
389 .map(|pr| Arc::clone(&pr.pause_signal))
390 }
391
392 /// Clone resume signal for passing to background task (only if pause/resume is initialized)
393 pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
394 self.pause_resume
395 .as_ref()
396 .map(|pr| Arc::clone(&pr.resume_signal))
397 }
398
399 // =========================================================================
400 // Lightweight state machine methods
401 // =========================================================================
402
403 /// Clone atomic state for passing to background task
404 pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
405 Arc::clone(&self.state_atomic)
406 }
407
408 /// Get current state from atomic (fast path, no locks)
409 pub(crate) fn state_atomic_get(&self) -> u8 {
410 self.state_atomic.load(Ordering::Acquire)
411 }
412
413 /// Set state to paused using atomic
414 pub(crate) fn state_atomic_set_paused(&self) {
415 self.state_atomic.store(STATE_PAUSED, Ordering::Release);
416 }
417
418 /// Set state to completed using atomic
419 pub(crate) fn state_atomic_set_completed(&self) {
420 self.state_atomic.store(STATE_COMPLETED, Ordering::Release);
421 }
422
423 /// Set state to failed using atomic
424 pub(crate) fn state_atomic_set_failed(&self) {
425 self.state_atomic.store(STATE_FAILED, Ordering::Release);
426 }
427
428 /// Get current stream statistics
429 ///
430 /// Returns a snapshot of stream state without consuming any items.
431 /// This can be called at any time to monitor progress.
432 ///
433 /// # Example
434 ///
435 /// ```no_run
436 /// // Requires: live Postgres streaming connection.
437 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
438 /// let stream = client.query::<serde_json::Value>("entity").execute().await?;
439 /// let stats = stream.stats();
440 /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub fn stats(&self) -> StreamStats {
445 let items_buffered = self.receiver.len();
446 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
447 let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
448 let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
449
450 StreamStats {
451 items_buffered,
452 estimated_memory,
453 total_rows_yielded,
454 total_rows_filtered,
455 }
456 }
457}
458
459impl Stream for JsonStream {
460 type Item = Result<Value>;
461
462 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
463 // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
464 // For 100K rows, this records ~100 times instead of 100K times
465 let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
466 if poll_idx.is_multiple_of(1000) {
467 let occupancy = self.receiver.len() as u64;
468 crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
469 crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
470 }
471
472 // Check memory limit BEFORE receiving (pre-enqueue strategy)
473 // This stops consuming when buffer reaches limit
474 if let Some(limit) = self.max_memory {
475 let items_buffered = self.receiver.len();
476 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
477
478 // Check soft limit thresholds first (warn before fail)
479 if let Some(fail_threshold) = self.soft_limit_fail_threshold {
480 let threshold_bytes = (limit as f32 * fail_threshold) as usize;
481 if estimated_memory > threshold_bytes {
482 // Record metric for memory limit exceeded
483 crate::metrics::counters::memory_limit_exceeded(&self.entity);
484 self.state_atomic_set_failed();
485 return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
486 limit,
487 estimated_memory,
488 })));
489 }
490 } else if estimated_memory > limit {
491 // Hard limit (no soft limits configured)
492 crate::metrics::counters::memory_limit_exceeded(&self.entity);
493 self.state_atomic_set_failed();
494 return Poll::Ready(Some(Err(WireError::MemoryLimitExceeded {
495 limit,
496 estimated_memory,
497 })));
498 }
499
500 // Note: Warn threshold would be handled by instrumentation/logging layer
501 // This is for application-level monitoring, not a hard error
502 }
503
504 match self.receiver.poll_recv(cx) {
505 Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
506 Poll::Ready(Some(Err(e))) => {
507 // Stream encountered an error
508 self.state_atomic_set_failed();
509 Poll::Ready(Some(Err(e)))
510 }
511 Poll::Ready(None) => {
512 // Stream completed normally
513 self.state_atomic_set_completed();
514 Poll::Ready(None)
515 }
516 Poll::Pending => Poll::Pending,
517 }
518 }
519}
520
521/// Extract JSON bytes from `DataRow` message
522///
523/// # Errors
524///
525/// Returns [`WireError::Protocol`] if the message is not a `DataRow`, the row does not
526/// have exactly one field, or the field value is null.
527pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
528 match msg {
529 BackendMessage::DataRow(fields) => {
530 if fields.len() != 1 {
531 return Err(WireError::Protocol(format!(
532 "expected 1 field, got {}",
533 fields.len()
534 )));
535 }
536
537 let field = &fields[0];
538 field
539 .clone()
540 .ok_or_else(|| WireError::Protocol("null data field".into()))
541 }
542 _ => Err(WireError::Protocol("expected DataRow".into())),
543 }
544}
545
546/// Parse JSON bytes into Value
547///
548/// # Errors
549///
550/// Returns [`WireError`] if the bytes are not valid JSON.
551pub fn parse_json(data: Bytes) -> Result<Value> {
552 let value: Value = serde_json::from_slice(&data)?;
553 Ok(value)
554}
555
556#[cfg(test)]
557mod tests {
558 #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
559 use super::*;
560
561 #[test]
562 fn test_extract_json_bytes() {
563 let data = Bytes::from_static(b"{\"key\":\"value\"}");
564 let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
565
566 let extracted = extract_json_bytes(&msg).unwrap();
567 assert_eq!(extracted, data);
568 }
569
570 #[test]
571 fn test_extract_null_field() {
572 let msg = BackendMessage::DataRow(vec![None]);
573 let result = extract_json_bytes(&msg);
574 assert!(
575 matches!(result, Err(WireError::Protocol(_))),
576 "expected Protocol error for null field, got: {result:?}"
577 );
578 }
579
580 #[test]
581 fn test_parse_json() {
582 let data = Bytes::from_static(b"{\"key\":\"value\"}");
583 let value = parse_json(data).unwrap();
584
585 assert_eq!(value["key"], "value");
586 }
587
588 #[test]
589 fn test_parse_invalid_json() {
590 let data = Bytes::from_static(b"not json");
591 let result = parse_json(data);
592 assert!(
593 matches!(result, Err(WireError::JsonDecode(_))),
594 "expected JsonDecode error for invalid JSON, got: {result:?}"
595 );
596 }
597
598 #[test]
599 fn test_stream_stats_creation() {
600 let stats = StreamStats::zero();
601 assert_eq!(stats.items_buffered, 0);
602 assert_eq!(stats.estimated_memory, 0);
603 assert_eq!(stats.total_rows_yielded, 0);
604 assert_eq!(stats.total_rows_filtered, 0);
605 }
606
607 #[test]
608 fn test_stream_stats_memory_estimation() {
609 let stats = StreamStats {
610 items_buffered: 100,
611 estimated_memory: 100 * 2048,
612 total_rows_yielded: 100,
613 total_rows_filtered: 10,
614 };
615
616 // 100 items * 2KB per item = 200KB
617 assert_eq!(stats.estimated_memory, 204800);
618 }
619
620 #[test]
621 fn test_stream_stats_clone() {
622 let stats = StreamStats {
623 items_buffered: 50,
624 estimated_memory: 100000,
625 total_rows_yielded: 500,
626 total_rows_filtered: 50,
627 };
628
629 let cloned = stats.clone();
630 assert_eq!(cloned.items_buffered, stats.items_buffered);
631 assert_eq!(cloned.estimated_memory, stats.estimated_memory);
632 assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
633 assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
634 }
635
636 #[test]
637 fn test_stream_state_constants() {
638 // Verify state constants are distinct
639 assert_ne!(STATE_RUNNING, STATE_PAUSED);
640 assert_ne!(STATE_RUNNING, STATE_COMPLETED);
641 assert_ne!(STATE_RUNNING, STATE_FAILED);
642 assert_ne!(STATE_PAUSED, STATE_COMPLETED);
643 assert_ne!(STATE_PAUSED, STATE_FAILED);
644 assert_ne!(STATE_COMPLETED, STATE_FAILED);
645 }
646
647 #[test]
648 fn test_stream_state_enum_equality() {
649 assert_eq!(StreamState::Running, StreamState::Running);
650 assert_eq!(StreamState::Paused, StreamState::Paused);
651 assert_eq!(StreamState::Completed, StreamState::Completed);
652 assert_eq!(StreamState::Failed, StreamState::Failed);
653 assert_ne!(StreamState::Running, StreamState::Paused);
654 }
655}