fraiseql_wire/stream/json_stream.rs
1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19const STATE_COMPLETED: u8 = 2;
20const STATE_FAILED: u8 = 3;
21
22/// Stream state machine
23///
24/// Tracks the current state of the JSON stream.
25/// Streams start in Running state and can transition to Paused
26/// or terminal states (Completed, Failed).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum StreamState {
29 /// Background task is actively reading from Postgres
30 Running,
31 /// Background task is paused (suspended, connection alive)
32 Paused,
33 /// Query completed normally
34 Completed,
35 /// Query failed with error
36 Failed,
37}
38
39/// Stream statistics snapshot
40///
41/// Provides a read-only view of current stream state without consuming items.
42/// All values are point-in-time measurements.
43#[derive(Debug, Clone)]
44pub struct StreamStats {
45 /// Number of items currently buffered in channel (0-256)
46 pub items_buffered: usize,
47 /// Estimated memory used by buffered items in bytes
48 pub estimated_memory: usize,
49 /// Total rows yielded to consumer so far
50 pub total_rows_yielded: u64,
51 /// Total rows filtered out by Rust predicates
52 pub total_rows_filtered: u64,
53}
54
55impl StreamStats {
56 /// Create zero-valued stats
57 ///
58 /// Useful for testing and initialization.
59 pub const fn zero() -> Self {
60 Self {
61 items_buffered: 0,
62 estimated_memory: 0,
63 total_rows_yielded: 0,
64 total_rows_filtered: 0,
65 }
66 }
67}
68
69/// JSON value stream
70pub struct JsonStream {
71 receiver: mpsc::Receiver<Result<Value>>,
72 _cancel_tx: mpsc::Sender<()>, // Dropped when stream is dropped
73 entity: String, // Entity name for metrics
74 rows_yielded: Arc<AtomicU64>, // Counter of items yielded to consumer
75 rows_filtered: Arc<AtomicU64>, // Counter of items filtered
76 max_memory: Option<usize>, // Optional memory limit in bytes
77 soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
78
79 // Lightweight state tracking (cheap AtomicU8)
80 // Used for fast state checks on all queries
81 // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
82 state_atomic: Arc<AtomicU8>,
83
84 // Pause/resume state machine (lazily initialized)
85 // Only allocated if pause() is called
86 pause_resume: Option<PauseResumeState>,
87
88 // Sampling counter for metrics recording (sample 1 in N polls)
89 poll_count: AtomicU64, // Counter for sampling metrics
90}
91
92/// Pause/resume state (lazily allocated)
93/// Only created when `pause()` is first called
94pub struct PauseResumeState {
95 state: Arc<Mutex<StreamState>>, // Current stream state
96 pause_signal: Arc<Notify>, // Signal to pause background task
97 resume_signal: Arc<Notify>, // Signal to resume background task
98 paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
99 pause_timeout: Option<Duration>, // Optional auto-resume timeout
100}
101
102impl JsonStream {
103 /// Create new JSON stream
104 pub(crate) fn new(
105 receiver: mpsc::Receiver<Result<Value>>,
106 cancel_tx: mpsc::Sender<()>,
107 entity: String,
108 max_memory: Option<usize>,
109 _soft_limit_warn_threshold: Option<f32>,
110 soft_limit_fail_threshold: Option<f32>,
111 ) -> Self {
112 Self {
113 receiver,
114 _cancel_tx: cancel_tx,
115 entity,
116 rows_yielded: Arc::new(AtomicU64::new(0)),
117 rows_filtered: Arc::new(AtomicU64::new(0)),
118 max_memory,
119 soft_limit_fail_threshold,
120
121 // Initialize lightweight atomic state
122 state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
123
124 // Pause/resume lazily initialized (only if pause() is called)
125 pause_resume: None,
126
127 // Initialize sampling counter
128 poll_count: AtomicU64::new(0),
129 }
130 }
131
132 /// Initialize pause/resume state (called on first `pause()`)
133 fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
134 if self.pause_resume.is_none() {
135 self.pause_resume = Some(PauseResumeState {
136 state: Arc::new(Mutex::new(StreamState::Running)),
137 pause_signal: Arc::new(Notify::new()),
138 resume_signal: Arc::new(Notify::new()),
139 paused_occupancy: Arc::new(AtomicUsize::new(0)),
140 pause_timeout: None,
141 });
142 }
143 self.pause_resume
144 .as_mut()
145 .expect("pause_resume initialized in the block above")
146 }
147
148 /// Get current stream state
149 ///
150 /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
151 /// This is a synchronous getter that doesn't require awaiting.
152 ///
153 /// Note: This is a best-effort snapshot that may return slightly stale state
154 /// due to the non-blocking nature of atomic reads.
155 pub fn state_snapshot(&self) -> StreamState {
156 // Read from lightweight atomic state (fast path, no locks)
157 match self.state_atomic.load(Ordering::Acquire) {
158 STATE_RUNNING => StreamState::Running,
159 STATE_PAUSED => StreamState::Paused,
160 STATE_COMPLETED => StreamState::Completed,
161 STATE_FAILED => StreamState::Failed,
162 _ => {
163 // Unknown state - fall back to checking if channel is closed
164 if self.receiver.is_closed() {
165 StreamState::Completed
166 } else {
167 StreamState::Running
168 }
169 }
170 }
171 }
172
173 /// Get buffered rows when paused
174 ///
175 /// Returns the number of rows buffered in the channel when the stream was paused.
176 /// Only meaningful when stream is in Paused state.
177 pub fn paused_occupancy(&self) -> usize {
178 self.pause_resume
179 .as_ref()
180 .map_or(0, |pr| pr.paused_occupancy.load(Ordering::Relaxed))
181 }
182
183 /// Set timeout for pause (auto-resume after duration)
184 ///
185 /// When a stream is paused, the background task will automatically resume
186 /// after the specified duration expires, even if `resume()` is not called.
187 ///
188 /// # Arguments
189 ///
190 /// * `duration` - How long to stay paused before auto-resuming
191 ///
192 /// # Examples
193 ///
194 /// ```no_run
195 /// use std::time::Duration;
196 /// use fraiseql_wire::stream::JsonStream;
197 ///
198 /// # async fn example(mut stream: JsonStream) -> fraiseql_wire::Result<()> {
199 /// stream.set_pause_timeout(Duration::from_secs(5));
200 /// stream.pause().await?; // Will auto-resume after 5 seconds
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub fn set_pause_timeout(&mut self, duration: Duration) {
205 self.ensure_pause_resume().pause_timeout = Some(duration);
206 tracing::debug!("pause timeout set to {:?}", duration);
207 }
208
209 /// Clear pause timeout (no auto-resume)
210 pub fn clear_pause_timeout(&mut self) {
211 if let Some(ref mut pr) = self.pause_resume {
212 pr.pause_timeout = None;
213 tracing::debug!("pause timeout cleared");
214 }
215 }
216
217 /// Get current pause timeout (if set)
218 pub(crate) fn pause_timeout(&self) -> Option<Duration> {
219 self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
220 }
221
222 /// Pause the stream
223 ///
224 /// Suspends the background task from reading more data from Postgres.
225 /// The connection remains open and can be resumed later.
226 /// Buffered rows are preserved and can be consumed normally.
227 ///
228 /// This method is idempotent: calling `pause()` on an already-paused stream is a no-op.
229 ///
230 /// Returns an error if the stream has already completed or failed.
231 ///
232 /// # Example
233 ///
234 /// ```no_run
235 /// // Requires: live Postgres streaming connection.
236 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
237 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
238 /// stream.pause().await?;
239 /// // Background task stops reading
240 /// // Consumer can still poll for remaining buffered items
241 /// # Ok(())
242 /// # }
243 /// ```
244 ///
245 /// # Errors
246 ///
247 /// Returns `Error::Protocol` if the stream has already completed or failed.
248 pub async fn pause(&mut self) -> Result<()> {
249 let entity = self.entity.clone();
250
251 // Update lightweight atomic state first (fast path)
252 self.state_atomic_set_paused();
253
254 let pr = self.ensure_pause_resume();
255 let mut state = pr.state.lock().await;
256
257 match *state {
258 StreamState::Running => {
259 // Signal background task to pause
260 pr.pause_signal.notify_one();
261 // Update state
262 *state = StreamState::Paused;
263
264 // Record metric
265 crate::metrics::counters::stream_paused(&entity);
266 Ok(())
267 }
268 StreamState::Paused => {
269 // Idempotent: already paused
270 Ok(())
271 }
272 StreamState::Completed | StreamState::Failed => {
273 // Cannot pause a terminal stream
274 Err(Error::Protocol(
275 "cannot pause a completed or failed stream".to_string(),
276 ))
277 }
278 }
279 }
280
281 /// Resume the stream
282 ///
283 /// Resumes the background task to continue reading data from Postgres.
284 /// Only has an effect if the stream is currently paused.
285 ///
286 /// This method is idempotent: calling `resume()` before `pause()` or on an
287 /// already-running stream is a no-op.
288 ///
289 /// Returns an error if the stream has already completed or failed.
290 ///
291 /// # Example
292 ///
293 /// ```no_run
294 /// // Requires: live Postgres streaming connection.
295 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
296 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
297 /// stream.resume().await?;
298 /// // Background task resumes reading
299 /// // Consumer can poll for more items
300 /// # Ok(())
301 /// # }
302 /// ```
303 ///
304 /// # Errors
305 ///
306 /// Returns `Error::Protocol` if the stream has already completed or failed.
307 pub async fn resume(&mut self) -> Result<()> {
308 // Update lightweight atomic state first (fast path)
309 // Check atomic state before borrowing pause_resume
310 let current = self.state_atomic_get();
311
312 // Resume only makes sense if pause/resume was initialized
313 if let Some(ref mut pr) = self.pause_resume {
314 let entity = self.entity.clone();
315
316 // Note: Set to RUNNING to reflect resumed state
317 if current == STATE_PAUSED {
318 // Only update atomic if currently paused
319 self.state_atomic.store(STATE_RUNNING, Ordering::Release);
320 }
321
322 let mut state = pr.state.lock().await;
323
324 match *state {
325 StreamState::Paused => {
326 // Signal background task to resume
327 pr.resume_signal.notify_one();
328 // Update state
329 *state = StreamState::Running;
330
331 // Record metric
332 crate::metrics::counters::stream_resumed(&entity);
333 Ok(())
334 }
335 StreamState::Running => {
336 // Idempotent: already running (or resume before pause)
337 Ok(())
338 }
339 StreamState::Completed | StreamState::Failed => {
340 // Cannot resume a terminal stream
341 Err(Error::Protocol(
342 "cannot resume a completed or failed stream".to_string(),
343 ))
344 }
345 }
346 } else {
347 // No pause/resume infrastructure (never paused), idempotent no-op
348 Ok(())
349 }
350 }
351
352 /// Pause the stream with a diagnostic reason
353 ///
354 /// Like `pause()`, but logs the provided reason for diagnostic purposes.
355 /// This helps track why streams are being paused (e.g., "backpressure",
356 /// "maintenance", "rate limit").
357 ///
358 /// # Arguments
359 ///
360 /// * `reason` - Optional reason for pausing (logged at debug level)
361 ///
362 /// # Example
363 ///
364 /// ```no_run
365 /// // Requires: live Postgres streaming connection.
366 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
367 /// let mut stream = client.query::<serde_json::Value>("entity").execute().await?;
368 /// stream.pause_with_reason("backpressure: consumer busy").await?;
369 /// # Ok(())
370 /// # }
371 /// ```
372 ///
373 /// # Errors
374 ///
375 /// Returns `Error::Protocol` if the stream has already completed or failed (delegates to [`pause`](Self::pause)).
376 pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
377 tracing::debug!("pausing stream: {}", reason);
378 self.pause().await
379 }
380
381 /// Clone internal state for passing to background task (only if pause/resume is initialized)
382 pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
383 self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
384 }
385
386 /// Clone pause signal for passing to background task (only if pause/resume is initialized)
387 pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
388 self.pause_resume
389 .as_ref()
390 .map(|pr| Arc::clone(&pr.pause_signal))
391 }
392
393 /// Clone resume signal for passing to background task (only if pause/resume is initialized)
394 pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
395 self.pause_resume
396 .as_ref()
397 .map(|pr| Arc::clone(&pr.resume_signal))
398 }
399
400 // =========================================================================
401 // Lightweight state machine methods
402 // =========================================================================
403
404 /// Clone atomic state for passing to background task
405 pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
406 Arc::clone(&self.state_atomic)
407 }
408
409 /// Get current state from atomic (fast path, no locks)
410 pub(crate) fn state_atomic_get(&self) -> u8 {
411 self.state_atomic.load(Ordering::Acquire)
412 }
413
414 /// Set state to paused using atomic
415 pub(crate) fn state_atomic_set_paused(&self) {
416 self.state_atomic.store(STATE_PAUSED, Ordering::Release);
417 }
418
419 /// Set state to completed using atomic
420 pub(crate) fn state_atomic_set_completed(&self) {
421 self.state_atomic.store(STATE_COMPLETED, Ordering::Release);
422 }
423
424 /// Set state to failed using atomic
425 pub(crate) fn state_atomic_set_failed(&self) {
426 self.state_atomic.store(STATE_FAILED, Ordering::Release);
427 }
428
429 /// Get current stream statistics
430 ///
431 /// Returns a snapshot of stream state without consuming any items.
432 /// This can be called at any time to monitor progress.
433 ///
434 /// # Example
435 ///
436 /// ```no_run
437 /// // Requires: live Postgres streaming connection.
438 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
439 /// let stream = client.query::<serde_json::Value>("entity").execute().await?;
440 /// let stats = stream.stats();
441 /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
442 /// # Ok(())
443 /// # }
444 /// ```
445 pub fn stats(&self) -> StreamStats {
446 let items_buffered = self.receiver.len();
447 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
448 let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
449 let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
450
451 StreamStats {
452 items_buffered,
453 estimated_memory,
454 total_rows_yielded,
455 total_rows_filtered,
456 }
457 }
458}
459
460impl Stream for JsonStream {
461 type Item = Result<Value>;
462
463 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
464 // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
465 // For 100K rows, this records ~100 times instead of 100K times
466 let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
467 if poll_idx.is_multiple_of(1000) {
468 let occupancy = self.receiver.len() as u64;
469 crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
470 crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
471 }
472
473 // Check memory limit BEFORE receiving (pre-enqueue strategy)
474 // This stops consuming when buffer reaches limit
475 if let Some(limit) = self.max_memory {
476 let items_buffered = self.receiver.len();
477 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
478
479 // Check soft limit thresholds first (warn before fail)
480 if let Some(fail_threshold) = self.soft_limit_fail_threshold {
481 let threshold_bytes = (limit as f32 * fail_threshold) as usize;
482 if estimated_memory > threshold_bytes {
483 // Record metric for memory limit exceeded
484 crate::metrics::counters::memory_limit_exceeded(&self.entity);
485 self.state_atomic_set_failed();
486 return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
487 limit,
488 estimated_memory,
489 })));
490 }
491 } else if estimated_memory > limit {
492 // Hard limit (no soft limits configured)
493 crate::metrics::counters::memory_limit_exceeded(&self.entity);
494 self.state_atomic_set_failed();
495 return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
496 limit,
497 estimated_memory,
498 })));
499 }
500
501 // Note: Warn threshold would be handled by instrumentation/logging layer
502 // This is for application-level monitoring, not a hard error
503 }
504
505 match self.receiver.poll_recv(cx) {
506 Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
507 Poll::Ready(Some(Err(e))) => {
508 // Stream encountered an error
509 self.state_atomic_set_failed();
510 Poll::Ready(Some(Err(e)))
511 }
512 Poll::Ready(None) => {
513 // Stream completed normally
514 self.state_atomic_set_completed();
515 Poll::Ready(None)
516 }
517 Poll::Pending => Poll::Pending,
518 }
519 }
520}
521
522/// Extract JSON bytes from `DataRow` message
523///
524/// # Errors
525///
526/// Returns `Error::Protocol` if the message is not a `DataRow`, contains more than one field,
527/// or the data field is null.
528pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
529 match msg {
530 BackendMessage::DataRow(fields) => {
531 if fields.len() != 1 {
532 return Err(Error::Protocol(format!(
533 "expected 1 field, got {}",
534 fields.len()
535 )));
536 }
537
538 let field = &fields[0];
539 field
540 .clone()
541 .ok_or_else(|| Error::Protocol("null data field".into()))
542 }
543 _ => Err(Error::Protocol("expected DataRow".into())),
544 }
545}
546
547/// Parse JSON bytes into Value
548///
549/// # Errors
550///
551/// Returns `Error::JsonDecode` if the bytes are not valid JSON.
552pub fn parse_json(data: Bytes) -> Result<Value> {
553 let value: Value = serde_json::from_slice(&data)?;
554 Ok(value)
555}
556
557#[cfg(test)]
558mod tests {
559 #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
560 use super::*;
561
562 #[test]
563 fn test_extract_json_bytes() {
564 let data = Bytes::from_static(b"{\"key\":\"value\"}");
565 let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
566
567 let extracted = extract_json_bytes(&msg).unwrap();
568 assert_eq!(extracted, data);
569 }
570
571 #[test]
572 fn test_extract_null_field() {
573 let msg = BackendMessage::DataRow(vec![None]);
574 assert!(extract_json_bytes(&msg).is_err());
575 }
576
577 #[test]
578 fn test_parse_json() {
579 let data = Bytes::from_static(b"{\"key\":\"value\"}");
580 let value = parse_json(data).unwrap();
581
582 assert_eq!(value["key"], "value");
583 }
584
585 #[test]
586 fn test_parse_invalid_json() {
587 let data = Bytes::from_static(b"not json");
588 assert!(parse_json(data).is_err());
589 }
590
591 #[test]
592 fn test_stream_stats_creation() {
593 let stats = StreamStats::zero();
594 assert_eq!(stats.items_buffered, 0);
595 assert_eq!(stats.estimated_memory, 0);
596 assert_eq!(stats.total_rows_yielded, 0);
597 assert_eq!(stats.total_rows_filtered, 0);
598 }
599
600 #[test]
601 fn test_stream_stats_memory_estimation() {
602 let stats = StreamStats {
603 items_buffered: 100,
604 estimated_memory: 100 * 2048,
605 total_rows_yielded: 100,
606 total_rows_filtered: 10,
607 };
608
609 // 100 items * 2KB per item = 200KB
610 assert_eq!(stats.estimated_memory, 204800);
611 }
612
613 #[test]
614 fn test_stream_stats_clone() {
615 let stats = StreamStats {
616 items_buffered: 50,
617 estimated_memory: 100000,
618 total_rows_yielded: 500,
619 total_rows_filtered: 50,
620 };
621
622 let cloned = stats.clone();
623 assert_eq!(cloned.items_buffered, stats.items_buffered);
624 assert_eq!(cloned.estimated_memory, stats.estimated_memory);
625 assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
626 assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
627 }
628
629 #[test]
630 fn test_stream_state_constants() {
631 // Verify state constants are distinct
632 assert_ne!(STATE_RUNNING, STATE_PAUSED);
633 assert_ne!(STATE_RUNNING, STATE_COMPLETED);
634 assert_ne!(STATE_RUNNING, STATE_FAILED);
635 assert_ne!(STATE_PAUSED, STATE_COMPLETED);
636 assert_ne!(STATE_PAUSED, STATE_FAILED);
637 assert_ne!(STATE_COMPLETED, STATE_FAILED);
638 }
639
640 #[test]
641 fn test_stream_state_enum_equality() {
642 assert_eq!(StreamState::Running, StreamState::Running);
643 assert_eq!(StreamState::Paused, StreamState::Paused);
644 assert_eq!(StreamState::Completed, StreamState::Completed);
645 assert_eq!(StreamState::Failed, StreamState::Failed);
646 assert_ne!(StreamState::Running, StreamState::Paused);
647 }
648}