Skip to main content

reifydb_function/subscription/
inspect.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	error::diagnostic::internal::internal,
6	interface::catalog::id::SubscriptionId,
7	key::{Key, subscription_row::SubscriptionRowKey},
8	value::column::{Column, columns::Columns, data::ColumnData},
9};
10use reifydb_type::{error::Error, fragment::Fragment};
11
12use crate::{GeneratorContext, GeneratorFunction};
13
14pub struct InspectSubscription;
15
16impl InspectSubscription {
17	pub fn new() -> Self {
18		Self {}
19	}
20}
21
22impl GeneratorFunction for InspectSubscription {
23	fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> crate::error::GeneratorFunctionResult<Columns> {
24		let txn = ctx.txn;
25
26		let params = &ctx.params;
27		if params.len() != 1 {
28			panic!("inspect_subscription requires exactly 1 parameter: subscription_id (u64)");
29		}
30
31		let id_column = params.get(0).unwrap();
32		let subscription_id_value = match id_column.data() {
33			ColumnData::Uint8(container) => {
34				container.get(0).copied().expect("subscription_id parameter is empty")
35			}
36			ColumnData::Utf8 {
37				container,
38				..
39			} => {
40				let id_str = container.get(0).expect("subscription_id parameter is empty");
41				id_str.parse::<u64>().expect("Invalid subscription_id format")
42			}
43			_ => panic!("subscription_id must be of type u64 or utf8"),
44		};
45
46		let subscription_id = SubscriptionId(subscription_id_value);
47
48		// Use catalog function to get subscription definition
49		let subscription_def = reifydb_catalog::find_subscription(txn, subscription_id)?
50			.unwrap_or_else(|| panic!("Subscription {} not found", subscription_id));
51
52		// Scan subscription rows
53		let range = SubscriptionRowKey::full_scan(subscription_id);
54		let mut stream = txn.range(range, 1024)?;
55
56		// Build columns structure
57		let all_columns = subscription_def.all_columns();
58		let mut column_data_builders: Vec<_> = all_columns
59			.iter()
60			.map(|col| (col.name.clone(), ColumnData::with_capacity(col.ty.clone(), 0)))
61			.collect();
62
63		let mut row_numbers = Vec::new();
64
65		// Collect all entries first to avoid borrow checker issues
66		let mut entries = Vec::new();
67		while let Some(result) = stream.next() {
68			entries.push(result?);
69		}
70		drop(stream); // Explicitly drop to release the borrow on txn
71
72		let catalog = ctx.catalog;
73		let schema_registry = &catalog.schema;
74
75		// Process collected entries
76		for entry in entries {
77			if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
78				row_numbers.push(sub_row_key.row);
79
80				let fingerprint = entry.values.fingerprint();
81				let schema = schema_registry.get_or_load(fingerprint, txn)?.ok_or_else(|| {
82					Error(internal(format!("Schema not found for fingerprint: {:?}", fingerprint)))
83				})?;
84
85				for (idx, (_, data)) in column_data_builders.iter_mut().enumerate() {
86					let value = schema.get_value(&entry.values, idx);
87					data.push_value(value);
88				}
89			}
90		}
91
92		let columns: Vec<Column> = column_data_builders
93			.into_iter()
94			.map(|(name, data)| Column {
95				name: Fragment::internal(&name),
96				data,
97			})
98			.collect();
99
100		Ok(Columns::with_row_numbers(columns, row_numbers))
101	}
102}