Skip to main content

reifydb_function/subscription/
inspect.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::find_subscription;
5use reifydb_core::{
6	error::diagnostic::internal::internal,
7	interface::catalog::id::SubscriptionId,
8	key::{Key, subscription_row::SubscriptionRowKey},
9	value::column::{Column, columns::Columns, data::ColumnData},
10};
11use reifydb_type::{error::Error, fragment::Fragment};
12
13use crate::{GeneratorContext, GeneratorFunction, error::GeneratorFunctionResult};
14
15pub struct InspectSubscription;
16
17impl InspectSubscription {
18	pub fn new() -> Self {
19		Self {}
20	}
21}
22
23impl GeneratorFunction for InspectSubscription {
24	fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns> {
25		let txn = ctx.txn;
26
27		let params = &ctx.params;
28		if params.len() != 1 {
29			panic!("inspect_subscription requires exactly 1 parameter: subscription_id (u64)");
30		}
31
32		let id_column = params.get(0).unwrap();
33		let subscription_id_value = match id_column.data() {
34			ColumnData::Uint8(container) => {
35				container.get(0).copied().expect("subscription_id parameter is empty")
36			}
37			ColumnData::Utf8 {
38				container,
39				..
40			} => {
41				let id_str = container.get(0).expect("subscription_id parameter is empty");
42				id_str.parse::<u64>().expect("Invalid subscription_id format")
43			}
44			_ => panic!("subscription_id must be of type u64 or utf8"),
45		};
46
47		let subscription_id = SubscriptionId(subscription_id_value);
48
49		// Use catalog function to get subscription definition
50		let subscription_def = find_subscription(txn, subscription_id)?
51			.unwrap_or_else(|| panic!("Subscription {} not found", subscription_id));
52
53		// Scan subscription rows
54		let range = SubscriptionRowKey::full_scan(subscription_id);
55		let mut stream = txn.range(range, 1024)?;
56
57		// Build columns structure
58		let all_columns = subscription_def.all_columns();
59		let mut column_data_builders: Vec<_> = all_columns
60			.iter()
61			.map(|col| (col.name.clone(), ColumnData::with_capacity(col.ty.clone(), 0)))
62			.collect();
63
64		let mut row_numbers = Vec::new();
65
66		// Collect all entries first to avoid borrow checker issues
67		let mut entries = Vec::new();
68		while let Some(result) = stream.next() {
69			entries.push(result?);
70		}
71		drop(stream); // Explicitly drop to release the borrow on txn
72
73		let catalog = ctx.catalog;
74		let schema_registry = &catalog.schema;
75
76		// Process collected entries
77		for entry in entries {
78			if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
79				row_numbers.push(sub_row_key.row);
80
81				let fingerprint = entry.values.fingerprint();
82				let schema = schema_registry.get_or_load(fingerprint, txn)?.ok_or_else(|| {
83					Error(internal(format!("Schema not found for fingerprint: {:?}", fingerprint)))
84				})?;
85
86				for (idx, (_, data)) in column_data_builders.iter_mut().enumerate() {
87					let value = schema.get_value(&entry.values, idx);
88					data.push_value(value);
89				}
90			}
91		}
92
93		let columns: Vec<Column> = column_data_builders
94			.into_iter()
95			.map(|(name, data)| Column {
96				name: Fragment::internal(&name),
97				data,
98			})
99			.collect();
100
101		Ok(Columns::with_row_numbers(columns, row_numbers))
102	}
103}