reifydb_core/interface/catalog/
source.rs1use reifydb_type::return_internal_error;
5use serde::{Deserialize, Serialize};
6
7use crate::interface::{
8 FlowDef, FlowId, RingBufferId, TableDef, TableId, TableVirtualDef, TableVirtualId, ViewDef, ViewId,
9};
10
11#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Hash, Serialize, Deserialize)]
12pub enum SourceId {
13 Table(TableId),
14 View(ViewId),
15 Flow(FlowId),
16 TableVirtual(TableVirtualId),
17 RingBuffer(RingBufferId),
18}
19
20impl std::fmt::Display for SourceId {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 match self {
23 SourceId::Table(id) => write!(f, "{}", id.0),
24 SourceId::View(id) => write!(f, "{}", id.0),
25 SourceId::Flow(id) => write!(f, "{}", id.0),
26 SourceId::TableVirtual(id) => write!(f, "{}", id.0),
27 SourceId::RingBuffer(id) => write!(f, "{}", id.0),
28 }
29 }
30}
31
32impl SourceId {
33 pub fn table(id: impl Into<TableId>) -> Self {
34 Self::Table(id.into())
35 }
36
37 pub fn view(id: impl Into<ViewId>) -> Self {
38 Self::View(id.into())
39 }
40
41 pub fn flow(id: impl Into<FlowId>) -> Self {
42 Self::Flow(id.into())
43 }
44
45 pub fn table_virtual(id: impl Into<TableVirtualId>) -> Self {
46 Self::TableVirtual(id.into())
47 }
48
49 pub fn ring_buffer(id: impl Into<RingBufferId>) -> Self {
50 Self::RingBuffer(id.into())
51 }
52}
53
54impl From<TableId> for SourceId {
55 fn from(id: TableId) -> Self {
56 SourceId::Table(id)
57 }
58}
59
60impl From<ViewId> for SourceId {
61 fn from(id: ViewId) -> Self {
62 SourceId::View(id)
63 }
64}
65
66impl From<FlowId> for SourceId {
67 fn from(id: FlowId) -> Self {
68 SourceId::Flow(id)
69 }
70}
71
72impl From<TableVirtualId> for SourceId {
73 fn from(id: TableVirtualId) -> Self {
74 SourceId::TableVirtual(id)
75 }
76}
77
78impl From<RingBufferId> for SourceId {
79 fn from(id: RingBufferId) -> Self {
80 SourceId::RingBuffer(id)
81 }
82}
83
84impl PartialEq<u64> for SourceId {
85 fn eq(&self, other: &u64) -> bool {
86 match self {
87 SourceId::Table(id) => id.0.eq(other),
88 SourceId::View(id) => id.0.eq(other),
89 SourceId::Flow(id) => id.0.eq(other),
90 SourceId::TableVirtual(id) => id.0.eq(other),
91 SourceId::RingBuffer(id) => id.0.eq(other),
92 }
93 }
94}
95
96impl PartialEq<TableId> for SourceId {
97 fn eq(&self, other: &TableId) -> bool {
98 match self {
99 SourceId::Table(id) => id.0 == other.0,
100 _ => false,
101 }
102 }
103}
104
105impl PartialEq<ViewId> for SourceId {
106 fn eq(&self, other: &ViewId) -> bool {
107 match self {
108 SourceId::View(id) => id.0 == other.0,
109 _ => false,
110 }
111 }
112}
113
114impl PartialEq<FlowId> for SourceId {
115 fn eq(&self, other: &FlowId) -> bool {
116 match self {
117 SourceId::Flow(id) => id.0 == other.0,
118 _ => false,
119 }
120 }
121}
122
123impl PartialEq<TableVirtualId> for SourceId {
124 fn eq(&self, other: &TableVirtualId) -> bool {
125 match self {
126 SourceId::TableVirtual(id) => id.0 == other.0,
127 _ => false,
128 }
129 }
130}
131
132impl PartialEq<RingBufferId> for SourceId {
133 fn eq(&self, other: &RingBufferId) -> bool {
134 match self {
135 SourceId::RingBuffer(id) => id.0 == other.0,
136 _ => false,
137 }
138 }
139}
140
141impl From<SourceId> for u64 {
142 fn from(source: SourceId) -> u64 {
143 source.as_u64()
144 }
145}
146
147impl SourceId {
148 pub fn as_u64(&self) -> u64 {
150 match self {
151 SourceId::Table(id) => id.0,
152 SourceId::View(id) => id.0,
153 SourceId::Flow(id) => id.0,
154 SourceId::TableVirtual(id) => id.0,
155 SourceId::RingBuffer(id) => id.0,
156 }
157 }
158
159 pub fn next(&self) -> SourceId {
161 match self {
162 SourceId::Table(table) => SourceId::table(table.0 + 1),
163 SourceId::View(view) => SourceId::view(view.0 + 1),
164 SourceId::Flow(flow) => SourceId::flow(flow.0 + 1),
165 SourceId::TableVirtual(table_virtual) => SourceId::table_virtual(table_virtual.0 + 1),
166 SourceId::RingBuffer(ring_buffer) => SourceId::ring_buffer(ring_buffer.0 + 1),
167 }
168 }
169
170 pub fn prev(&self) -> SourceId {
175 match self {
176 SourceId::Table(table) => SourceId::table(table.0.wrapping_sub(1)),
177 SourceId::View(view) => SourceId::view(view.0.wrapping_sub(1)),
178 SourceId::Flow(flow) => SourceId::flow(flow.0.wrapping_sub(1)),
179 SourceId::TableVirtual(table_virtual) => {
180 SourceId::table_virtual(table_virtual.0.wrapping_sub(1))
181 }
182 SourceId::RingBuffer(ring_buffer) => SourceId::ring_buffer(ring_buffer.0.wrapping_sub(1)),
183 }
184 }
185
186 pub fn to_table_id(self) -> crate::Result<TableId> {
187 if let SourceId::Table(table) = self {
188 Ok(table)
189 } else {
190 return_internal_error!(
191 "Data inconsistency: Expected SourceId::Table but found {:?}. \
192 This indicates a critical catalog inconsistency where a non-table source ID \
193 was used in a context that requires a table ID.",
194 self
195 )
196 }
197 }
198
199 pub fn to_view_id(self) -> crate::Result<ViewId> {
200 if let SourceId::View(view) = self {
201 Ok(view)
202 } else {
203 return_internal_error!(
204 "Data inconsistency: Expected SourceId::View but found {:?}. \
205 This indicates a critical catalog inconsistency where a non-view source ID \
206 was used in a context that requires a view ID.",
207 self
208 )
209 }
210 }
211
212 pub fn to_flow_id(self) -> crate::Result<FlowId> {
213 if let SourceId::Flow(flow) = self {
214 Ok(flow)
215 } else {
216 return_internal_error!(
217 "Data inconsistency: Expected SourceId::Flow but found {:?}. \
218 This indicates a critical catalog inconsistency where a non-flow source ID \
219 was used in a context that requires a flow ID.",
220 self
221 )
222 }
223 }
224
225 pub fn to_table_virtual_id(self) -> crate::Result<TableVirtualId> {
226 if let SourceId::TableVirtual(table_virtual) = self {
227 Ok(table_virtual)
228 } else {
229 return_internal_error!(
230 "Data inconsistency: Expected SourceId::TableVirtual but found {:?}. \
231 This indicates a critical catalog inconsistency where a non-virtual-table source ID \
232 was used in a context that requires a virtual table ID.",
233 self
234 )
235 }
236 }
237
238 pub fn to_ring_buffer_id(self) -> crate::Result<RingBufferId> {
239 if let SourceId::RingBuffer(ring_buffer) = self {
240 Ok(ring_buffer)
241 } else {
242 return_internal_error!(
243 "Data inconsistency: Expected SourceId::RingBuffer but found {:?}. \
244 This indicates a critical catalog inconsistency where a non-ring-buffer source ID \
245 was used in a context that requires a ring buffer ID.",
246 self
247 )
248 }
249 }
250}
251
252#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
253pub enum SourceDef {
254 Table(TableDef),
255 View(ViewDef),
256 Flow(FlowDef),
257 TableVirtual(TableVirtualDef),
258}
259
260impl SourceDef {
261 pub fn id(&self) -> SourceId {
262 match self {
263 SourceDef::Table(table) => table.id.into(),
264 SourceDef::View(view) => view.id.into(),
265 SourceDef::Flow(flow) => flow.id.into(),
266 SourceDef::TableVirtual(table_virtual) => table_virtual.id.into(),
267 }
268 }
269
270 pub fn source_type(&self) -> SourceId {
271 match self {
272 SourceDef::Table(table) => SourceId::Table(table.id),
273 SourceDef::View(view) => SourceId::View(view.id),
274 SourceDef::Flow(flow) => SourceId::Flow(flow.id),
275 SourceDef::TableVirtual(table_virtual) => SourceId::TableVirtual(table_virtual.id),
276 }
277 }
278}