reifydb_core/stream/
error.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Error types for streaming query execution.
5
6use std::{fmt, sync::Arc};
7
8use reifydb_type::diagnostic::Diagnostic;
9
10/// Result type for stream items.
11pub type StreamResult<T> = Result<T, StreamError>;
12
13/// Errors that can occur during streaming query execution.
14#[derive(Debug, Clone)]
15pub enum StreamError {
16	/// Query execution error from the engine.
17	Query {
18		/// The underlying diagnostic error.
19		diagnostic: Arc<Diagnostic>,
20		/// The statement that caused the error (if applicable).
21		statement: Option<String>,
22	},
23
24	/// Stream was cancelled via cancellation token.
25	Cancelled,
26
27	/// Query execution exceeded timeout.
28	Timeout,
29
30	/// Internal channel error (producer dropped unexpectedly).
31	Disconnected,
32}
33
34impl fmt::Display for StreamError {
35	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36		match self {
37			StreamError::Query {
38				diagnostic,
39				..
40			} => {
41				// Use full diagnostic rendering for proper error display
42				use reifydb_type::diagnostic::render::DefaultRenderer;
43				let rendered = DefaultRenderer::render_string(diagnostic);
44				write!(f, "{}", rendered)
45			}
46			StreamError::Cancelled => write!(f, "Query was cancelled"),
47			StreamError::Timeout => write!(f, "Query execution timed out"),
48			StreamError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
49		}
50	}
51}
52
53impl std::error::Error for StreamError {}
54
55impl From<reifydb_type::Error> for StreamError {
56	fn from(err: reifydb_type::Error) -> Self {
57		StreamError::Query {
58			diagnostic: Arc::new(err.diagnostic()),
59			statement: None,
60		}
61	}
62}
63
64impl StreamError {
65	/// Create a query error with a statement context.
66	pub fn query_with_statement(err: reifydb_type::Error, statement: String) -> Self {
67		let mut diagnostic = err.diagnostic();
68		// Set the statement on the diagnostic so the renderer can display it
69		diagnostic.with_statement(statement.clone());
70		StreamError::Query {
71			diagnostic: Arc::new(diagnostic),
72			statement: Some(statement),
73		}
74	}
75}