fraiseql_wire/stream/json_stream.rs
1//! JSON stream implementation
2
3use crate::protocol::BackendMessage;
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::sync::{mpsc, Mutex, Notify};
14
15// Lightweight state machine constants
16// Used for fast state tracking without full Mutex overhead
17const STATE_RUNNING: u8 = 0;
18const STATE_PAUSED: u8 = 1;
19
20/// Stream state machine
21///
22/// Tracks the current state of the JSON stream.
23/// Streams start in Running state and can transition to Paused
24/// or terminal states (Completed, Failed).
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamState {
27 /// Background task is actively reading from Postgres
28 Running,
29 /// Background task is paused (suspended, connection alive)
30 Paused,
31 /// Query completed normally
32 Completed,
33 /// Query failed with error
34 Failed,
35}
36
37/// Stream statistics snapshot
38///
39/// Provides a read-only view of current stream state without consuming items.
40/// All values are point-in-time measurements.
41#[derive(Debug, Clone)]
42pub struct StreamStats {
43 /// Number of items currently buffered in channel (0-256)
44 pub items_buffered: usize,
45 /// Estimated memory used by buffered items in bytes
46 pub estimated_memory: usize,
47 /// Total rows yielded to consumer so far
48 pub total_rows_yielded: u64,
49 /// Total rows filtered out by Rust predicates
50 pub total_rows_filtered: u64,
51}
52
53impl StreamStats {
54 /// Create zero-valued stats
55 ///
56 /// Useful for testing and initialization.
57 pub fn zero() -> Self {
58 Self {
59 items_buffered: 0,
60 estimated_memory: 0,
61 total_rows_yielded: 0,
62 total_rows_filtered: 0,
63 }
64 }
65}
66
67/// JSON value stream
68pub struct JsonStream {
69 receiver: mpsc::Receiver<Result<Value>>,
70 _cancel_tx: mpsc::Sender<()>, // Dropped when stream is dropped
71 entity: String, // Entity name for metrics
72 rows_yielded: Arc<AtomicU64>, // Counter of items yielded to consumer
73 rows_filtered: Arc<AtomicU64>, // Counter of items filtered
74 max_memory: Option<usize>, // Optional memory limit in bytes
75 soft_limit_fail_threshold: Option<f32>, // Fail at threshold % (0.0-1.0)
76
77 // Lightweight state tracking (cheap AtomicU8)
78 // Used for fast state checks on all queries
79 // Values: 0=Running, 1=Paused, 2=Complete, 3=Error
80 state_atomic: Arc<AtomicU8>,
81
82 // Pause/resume state machine (lazily initialized)
83 // Only allocated if pause() is called
84 pause_resume: Option<PauseResumeState>,
85
86 // Sampling counter for metrics recording (sample 1 in N polls)
87 poll_count: AtomicU64, // Counter for sampling metrics
88}
89
90/// Pause/resume state (lazily allocated)
91/// Only created when pause() is first called
92pub struct PauseResumeState {
93 state: Arc<Mutex<StreamState>>, // Current stream state
94 pause_signal: Arc<Notify>, // Signal to pause background task
95 resume_signal: Arc<Notify>, // Signal to resume background task
96 paused_occupancy: Arc<AtomicUsize>, // Buffered rows when paused
97 pause_timeout: Option<Duration>, // Optional auto-resume timeout
98}
99
100impl JsonStream {
101 /// Create new JSON stream
102 pub(crate) fn new(
103 receiver: mpsc::Receiver<Result<Value>>,
104 cancel_tx: mpsc::Sender<()>,
105 entity: String,
106 max_memory: Option<usize>,
107 _soft_limit_warn_threshold: Option<f32>,
108 soft_limit_fail_threshold: Option<f32>,
109 ) -> Self {
110 Self {
111 receiver,
112 _cancel_tx: cancel_tx,
113 entity,
114 rows_yielded: Arc::new(AtomicU64::new(0)),
115 rows_filtered: Arc::new(AtomicU64::new(0)),
116 max_memory,
117 soft_limit_fail_threshold,
118
119 // Initialize lightweight atomic state
120 state_atomic: Arc::new(AtomicU8::new(STATE_RUNNING)),
121
122 // Pause/resume lazily initialized (only if pause() is called)
123 pause_resume: None,
124
125 // Initialize sampling counter
126 poll_count: AtomicU64::new(0),
127 }
128 }
129
130 /// Initialize pause/resume state (called on first pause())
131 fn ensure_pause_resume(&mut self) -> &mut PauseResumeState {
132 if self.pause_resume.is_none() {
133 self.pause_resume = Some(PauseResumeState {
134 state: Arc::new(Mutex::new(StreamState::Running)),
135 pause_signal: Arc::new(Notify::new()),
136 resume_signal: Arc::new(Notify::new()),
137 paused_occupancy: Arc::new(AtomicUsize::new(0)),
138 pause_timeout: None,
139 });
140 }
141 self.pause_resume.as_mut().unwrap()
142 }
143
144 /// Get current stream state
145 ///
146 /// Returns the current state of the stream (Running, Paused, Completed, or Failed).
147 /// This is a synchronous getter that doesn't require awaiting.
148 pub fn state_snapshot(&self) -> StreamState {
149 // This is a best-effort snapshot that may return slightly stale state
150 // For guaranteed accurate state, use `state()` method
151 StreamState::Running // Will be updated when state machine is fully integrated
152 }
153
154 /// Get buffered rows when paused
155 ///
156 /// Returns the number of rows buffered in the channel when the stream was paused.
157 /// Only meaningful when stream is in Paused state.
158 pub fn paused_occupancy(&self) -> usize {
159 self.pause_resume
160 .as_ref()
161 .map(|pr| pr.paused_occupancy.load(Ordering::Relaxed))
162 .unwrap_or(0)
163 }
164
165 /// Set timeout for pause (auto-resume after duration)
166 ///
167 /// When a stream is paused, the background task will automatically resume
168 /// after the specified duration expires, even if resume() is not called.
169 ///
170 /// # Arguments
171 ///
172 /// * `duration` - How long to stay paused before auto-resuming
173 ///
174 /// # Examples
175 ///
176 /// ```ignore
177 /// let mut stream = client.query::<T>("entity").execute().await?;
178 /// stream.set_pause_timeout(Duration::from_secs(5));
179 /// stream.pause().await?; // Will auto-resume after 5 seconds
180 /// ```
181 pub fn set_pause_timeout(&mut self, duration: Duration) {
182 self.ensure_pause_resume().pause_timeout = Some(duration);
183 tracing::debug!("pause timeout set to {:?}", duration);
184 }
185
186 /// Clear pause timeout (no auto-resume)
187 pub fn clear_pause_timeout(&mut self) {
188 if let Some(ref mut pr) = self.pause_resume {
189 pr.pause_timeout = None;
190 tracing::debug!("pause timeout cleared");
191 }
192 }
193
194 /// Get current pause timeout (if set)
195 pub(crate) fn pause_timeout(&self) -> Option<Duration> {
196 self.pause_resume.as_ref().and_then(|pr| pr.pause_timeout)
197 }
198
199 /// Pause the stream
200 ///
201 /// Suspends the background task from reading more data from Postgres.
202 /// The connection remains open and can be resumed later.
203 /// Buffered rows are preserved and can be consumed normally.
204 ///
205 /// This method is idempotent: calling pause() on an already-paused stream is a no-op.
206 ///
207 /// Returns an error if the stream has already completed or failed.
208 ///
209 /// # Example
210 ///
211 /// ```ignore
212 /// stream.pause().await?;
213 /// // Background task stops reading
214 /// // Consumer can still poll for remaining buffered items
215 /// ```
216 pub async fn pause(&mut self) -> Result<()> {
217 let entity = self.entity.clone();
218
219 // Update lightweight atomic state first (fast path)
220 self.state_atomic_set_paused();
221
222 let pr = self.ensure_pause_resume();
223 let mut state = pr.state.lock().await;
224
225 match *state {
226 StreamState::Running => {
227 // Signal background task to pause
228 pr.pause_signal.notify_one();
229 // Update state
230 *state = StreamState::Paused;
231
232 // Record metric
233 crate::metrics::counters::stream_paused(&entity);
234 Ok(())
235 }
236 StreamState::Paused => {
237 // Idempotent: already paused
238 Ok(())
239 }
240 StreamState::Completed | StreamState::Failed => {
241 // Cannot pause a terminal stream
242 Err(Error::Protocol(
243 "cannot pause a completed or failed stream".to_string(),
244 ))
245 }
246 }
247 }
248
249 /// Resume the stream
250 ///
251 /// Resumes the background task to continue reading data from Postgres.
252 /// Only has an effect if the stream is currently paused.
253 ///
254 /// This method is idempotent: calling resume() before pause() or on an
255 /// already-running stream is a no-op.
256 ///
257 /// Returns an error if the stream has already completed or failed.
258 ///
259 /// # Example
260 ///
261 /// ```ignore
262 /// stream.resume().await?;
263 /// // Background task resumes reading
264 /// // Consumer can poll for more items
265 /// ```
266 pub async fn resume(&mut self) -> Result<()> {
267 // Update lightweight atomic state first (fast path)
268 // Check atomic state before borrowing pause_resume
269 let current = self.state_atomic_get();
270
271 // Resume only makes sense if pause/resume was initialized
272 if let Some(ref mut pr) = self.pause_resume {
273 let entity = self.entity.clone();
274
275 // Note: Set to RUNNING to reflect resumed state
276 if current == STATE_PAUSED {
277 // Only update atomic if currently paused
278 self.state_atomic.store(STATE_RUNNING, Ordering::Release);
279 }
280
281 let mut state = pr.state.lock().await;
282
283 match *state {
284 StreamState::Paused => {
285 // Signal background task to resume
286 pr.resume_signal.notify_one();
287 // Update state
288 *state = StreamState::Running;
289
290 // Record metric
291 crate::metrics::counters::stream_resumed(&entity);
292 Ok(())
293 }
294 StreamState::Running => {
295 // Idempotent: already running (or resume before pause)
296 Ok(())
297 }
298 StreamState::Completed | StreamState::Failed => {
299 // Cannot resume a terminal stream
300 Err(Error::Protocol(
301 "cannot resume a completed or failed stream".to_string(),
302 ))
303 }
304 }
305 } else {
306 // No pause/resume infrastructure (never paused), idempotent no-op
307 Ok(())
308 }
309 }
310
311 /// Pause the stream with a diagnostic reason
312 ///
313 /// Like pause(), but logs the provided reason for diagnostic purposes.
314 /// This helps track why streams are being paused (e.g., "backpressure",
315 /// "maintenance", "rate limit").
316 ///
317 /// # Arguments
318 ///
319 /// * `reason` - Optional reason for pausing (logged at debug level)
320 ///
321 /// # Example
322 ///
323 /// ```ignore
324 /// stream.pause_with_reason("backpressure: consumer busy").await?;
325 /// ```
326 pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
327 tracing::debug!("pausing stream: {}", reason);
328 self.pause().await
329 }
330
331 /// Clone internal state for passing to background task (only if pause/resume is initialized)
332 pub(crate) fn clone_state(&self) -> Option<Arc<Mutex<StreamState>>> {
333 self.pause_resume.as_ref().map(|pr| Arc::clone(&pr.state))
334 }
335
336 /// Clone pause signal for passing to background task (only if pause/resume is initialized)
337 pub(crate) fn clone_pause_signal(&self) -> Option<Arc<Notify>> {
338 self.pause_resume
339 .as_ref()
340 .map(|pr| Arc::clone(&pr.pause_signal))
341 }
342
343 /// Clone resume signal for passing to background task (only if pause/resume is initialized)
344 pub(crate) fn clone_resume_signal(&self) -> Option<Arc<Notify>> {
345 self.pause_resume
346 .as_ref()
347 .map(|pr| Arc::clone(&pr.resume_signal))
348 }
349
350 // =========================================================================
351 // Lightweight state machine methods
352 // =========================================================================
353
354 /// Clone atomic state for passing to background task
355 pub(crate) fn clone_state_atomic(&self) -> Arc<AtomicU8> {
356 Arc::clone(&self.state_atomic)
357 }
358
359 /// Get current state from atomic (fast path, no locks)
360 pub(crate) fn state_atomic_get(&self) -> u8 {
361 self.state_atomic.load(Ordering::Acquire)
362 }
363
364 /// Set state to paused using atomic
365 pub(crate) fn state_atomic_set_paused(&self) {
366 self.state_atomic.store(STATE_PAUSED, Ordering::Release);
367 }
368
369 /// Get current stream statistics
370 ///
371 /// Returns a snapshot of stream state without consuming any items.
372 /// This can be called at any time to monitor progress.
373 ///
374 /// # Example
375 ///
376 /// ```ignore
377 /// let stats = stream.stats();
378 /// println!("Buffered: {}, Yielded: {}", stats.items_buffered, stats.total_rows_yielded);
379 /// ```
380 pub fn stats(&self) -> StreamStats {
381 let items_buffered = self.receiver.len();
382 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
383 let total_rows_yielded = self.rows_yielded.load(Ordering::Relaxed);
384 let total_rows_filtered = self.rows_filtered.load(Ordering::Relaxed);
385
386 StreamStats {
387 items_buffered,
388 estimated_memory,
389 total_rows_yielded,
390 total_rows_filtered,
391 }
392 }
393
394 /// Increment rows yielded counter (called from FilteredStream)
395 #[allow(unused)]
396 pub(crate) fn increment_rows_yielded(&self, count: u64) {
397 self.rows_yielded.fetch_add(count, Ordering::Relaxed);
398 }
399
400 /// Increment rows filtered counter (called from FilteredStream)
401 #[allow(unused)]
402 pub(crate) fn increment_rows_filtered(&self, count: u64) {
403 self.rows_filtered.fetch_add(count, Ordering::Relaxed);
404 }
405
406 /// Clone the yielded counter for passing to background task
407 #[allow(unused)]
408 pub(crate) fn clone_rows_yielded(&self) -> Arc<AtomicU64> {
409 Arc::clone(&self.rows_yielded)
410 }
411
412 /// Clone the filtered counter for passing to background task
413 #[allow(unused)]
414 pub(crate) fn clone_rows_filtered(&self) -> Arc<AtomicU64> {
415 Arc::clone(&self.rows_filtered)
416 }
417}
418
419impl Stream for JsonStream {
420 type Item = Result<Value>;
421
422 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
423 // Sample metrics: record 1 in every 1000 polls to avoid hot path overhead
424 // For 100K rows, this records ~100 times instead of 100K times
425 let poll_idx = self.poll_count.fetch_add(1, Ordering::Relaxed);
426 if poll_idx % 1000 == 0 {
427 let occupancy = self.receiver.len() as u64;
428 crate::metrics::histograms::channel_occupancy(&self.entity, occupancy);
429 crate::metrics::gauges::stream_buffered_items(&self.entity, occupancy as usize);
430 }
431
432 // Check memory limit BEFORE receiving (pre-enqueue strategy)
433 // This stops consuming when buffer reaches limit
434 if let Some(limit) = self.max_memory {
435 let items_buffered = self.receiver.len();
436 let estimated_memory = items_buffered * 2048; // Conservative: 2KB per item
437
438 // Check soft limit thresholds first (warn before fail)
439 if let Some(fail_threshold) = self.soft_limit_fail_threshold {
440 let threshold_bytes = (limit as f32 * fail_threshold) as usize;
441 if estimated_memory > threshold_bytes {
442 // Record metric for memory limit exceeded
443 crate::metrics::counters::memory_limit_exceeded(&self.entity);
444 return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
445 limit,
446 estimated_memory,
447 })));
448 }
449 } else if estimated_memory > limit {
450 // Hard limit (no soft limits configured)
451 crate::metrics::counters::memory_limit_exceeded(&self.entity);
452 return Poll::Ready(Some(Err(Error::MemoryLimitExceeded {
453 limit,
454 estimated_memory,
455 })));
456 }
457
458 // Note: Warn threshold would be handled by instrumentation/logging layer
459 // This is for application-level monitoring, not a hard error
460 }
461
462 self.receiver.poll_recv(cx)
463 }
464}
465
466/// Extract JSON bytes from DataRow message
467pub fn extract_json_bytes(msg: &BackendMessage) -> Result<Bytes> {
468 match msg {
469 BackendMessage::DataRow(fields) => {
470 if fields.len() != 1 {
471 return Err(Error::Protocol(format!(
472 "expected 1 field, got {}",
473 fields.len()
474 )));
475 }
476
477 let field = &fields[0];
478 field
479 .clone()
480 .ok_or_else(|| Error::Protocol("null data field".into()))
481 }
482 _ => Err(Error::Protocol("expected DataRow".into())),
483 }
484}
485
486/// Parse JSON bytes into Value
487pub fn parse_json(data: Bytes) -> Result<Value> {
488 let value: Value = serde_json::from_slice(&data)?;
489 Ok(value)
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495
496 #[test]
497 fn test_extract_json_bytes() {
498 let data = Bytes::from_static(b"{\"key\":\"value\"}");
499 let msg = BackendMessage::DataRow(vec![Some(data.clone())]);
500
501 let extracted = extract_json_bytes(&msg).unwrap();
502 assert_eq!(extracted, data);
503 }
504
505 #[test]
506 fn test_extract_null_field() {
507 let msg = BackendMessage::DataRow(vec![None]);
508 assert!(extract_json_bytes(&msg).is_err());
509 }
510
511 #[test]
512 fn test_parse_json() {
513 let data = Bytes::from_static(b"{\"key\":\"value\"}");
514 let value = parse_json(data).unwrap();
515
516 assert_eq!(value["key"], "value");
517 }
518
519 #[test]
520 fn test_parse_invalid_json() {
521 let data = Bytes::from_static(b"not json");
522 assert!(parse_json(data).is_err());
523 }
524
525 #[test]
526 fn test_stream_stats_creation() {
527 let stats = StreamStats::zero();
528 assert_eq!(stats.items_buffered, 0);
529 assert_eq!(stats.estimated_memory, 0);
530 assert_eq!(stats.total_rows_yielded, 0);
531 assert_eq!(stats.total_rows_filtered, 0);
532 }
533
534 #[test]
535 fn test_stream_stats_memory_estimation() {
536 let stats = StreamStats {
537 items_buffered: 100,
538 estimated_memory: 100 * 2048,
539 total_rows_yielded: 100,
540 total_rows_filtered: 10,
541 };
542
543 // 100 items * 2KB per item = 200KB
544 assert_eq!(stats.estimated_memory, 204800);
545 }
546
547 #[test]
548 fn test_stream_stats_clone() {
549 let stats = StreamStats {
550 items_buffered: 50,
551 estimated_memory: 100000,
552 total_rows_yielded: 500,
553 total_rows_filtered: 50,
554 };
555
556 let cloned = stats.clone();
557 assert_eq!(cloned.items_buffered, stats.items_buffered);
558 assert_eq!(cloned.estimated_memory, stats.estimated_memory);
559 assert_eq!(cloned.total_rows_yielded, stats.total_rows_yielded);
560 assert_eq!(cloned.total_rows_filtered, stats.total_rows_filtered);
561 }
562}