Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	materialized::MaterializedCatalog,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{consume::host::CdcHost, produce::watermark::CdcProducerWatermark, storage::CdcStore};
24use reifydb_core::{
25	common::CommitVersion,
26	error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27	event::{Event, EventBus},
28	execution::ExecutionResult,
29	interface::{
30		WithEventBus,
31		catalog::{
32			column::{Column, ColumnIndex},
33			id::ColumnId,
34			vtable::{VTable, VTableId},
35		},
36	},
37	metric::ExecutionMetrics,
38	util::ioc::IocContainer,
39};
40use reifydb_metric::storage::metric::MetricReader;
41use reifydb_runtime::{
42	actor::{mailbox::ActorRef, system::ActorSystem},
43	context::{clock::Clock, rng::Rng},
44};
45use reifydb_store_single::SingleStore;
46use reifydb_transaction::{
47	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
48	multi::transaction::MultiTransaction,
49	single::SingleTransaction,
50	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
51};
52use reifydb_type::{
53	error::Error,
54	fragment::Fragment,
55	params::Params,
56	value::{constraint::TypeConstraint, identity::IdentityId},
57};
58use tracing::instrument;
59
60use crate::{
61	Result,
62	bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
63	interceptor::catalog::MaterializedCatalogInterceptor,
64	vm::{
65		Admin, Command, Query, Subscription,
66		executor::Executor,
67		services::{EngineConfig, Services},
68	},
69};
70
71pub struct StandardEngine(Arc<Inner>);
72
73impl WithEventBus for StandardEngine {
74	fn event_bus(&self) -> &EventBus {
75		&self.event_bus
76	}
77}
78
79impl AuthEngine for StandardEngine {
80	fn begin_admin(&self) -> Result<AdminTransaction> {
81		StandardEngine::begin_admin(self, IdentityId::system())
82	}
83
84	fn begin_query(&self) -> Result<QueryTransaction> {
85		StandardEngine::begin_query(self, IdentityId::system())
86	}
87
88	fn catalog(&self) -> Catalog {
89		StandardEngine::catalog(self)
90	}
91}
92
93// Engine methods (formerly from Engine trait in reifydb-core)
94impl StandardEngine {
95	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
96	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
97		let interceptors = self.interceptors.create();
98		let mut txn = CommandTransaction::new(
99			self.multi.clone(),
100			self.single.clone(),
101			self.event_bus.clone(),
102			interceptors,
103			identity,
104			self.executor.runtime_context.clock.clone(),
105		)?;
106		txn.set_executor(Arc::new(self.executor.clone()));
107		Ok(txn)
108	}
109
110	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
111	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
112		let interceptors = self.interceptors.create();
113		let mut txn = AdminTransaction::new(
114			self.multi.clone(),
115			self.single.clone(),
116			self.event_bus.clone(),
117			interceptors,
118			identity,
119			self.executor.runtime_context.clock.clone(),
120		)?;
121		txn.set_executor(Arc::new(self.executor.clone()));
122		Ok(txn)
123	}
124
125	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
126	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
127		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
128		txn.set_executor(Arc::new(self.executor.clone()));
129		Ok(txn)
130	}
131
132	/// Get the runtime clock for timestamp operations.
133	pub fn clock(&self) -> &Clock {
134		&self.executor.runtime_context.clock
135	}
136
137	pub fn rng(&self) -> &Rng {
138		&self.executor.runtime_context.rng
139	}
140
141	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
142	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
143		if let Err(e) = self.reject_if_read_only() {
144			return ExecutionResult {
145				frames: vec![],
146				error: Some(e),
147				metrics: ExecutionMetrics::default(),
148			};
149		}
150		let mut txn = match self.begin_admin(identity) {
151			Ok(t) => t,
152			Err(mut e) => {
153				e.with_rql(rql.to_string());
154				return ExecutionResult {
155					frames: vec![],
156					error: Some(e),
157					metrics: ExecutionMetrics::default(),
158				};
159			}
160		};
161		let mut outcome = self.executor.admin(
162			&mut txn,
163			Admin {
164				rql,
165				params,
166			},
167		);
168		if outcome.is_ok()
169			&& let Err(mut e) = txn.commit()
170		{
171			e.with_rql(rql.to_string());
172			outcome.error = Some(e);
173		}
174		if let Some(ref mut e) = outcome.error {
175			e.with_rql(rql.to_string());
176		}
177		outcome
178	}
179
180	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
181	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
182		if let Err(e) = self.reject_if_read_only() {
183			return ExecutionResult {
184				frames: vec![],
185				error: Some(e),
186				metrics: ExecutionMetrics::default(),
187			};
188		}
189		let mut txn = match self.begin_command(identity) {
190			Ok(t) => t,
191			Err(mut e) => {
192				e.with_rql(rql.to_string());
193				return ExecutionResult {
194					frames: vec![],
195					error: Some(e),
196					metrics: ExecutionMetrics::default(),
197				};
198			}
199		};
200		let mut outcome = self.executor.command(
201			&mut txn,
202			Command {
203				rql,
204				params,
205			},
206		);
207		if outcome.is_ok()
208			&& let Err(mut e) = txn.commit()
209		{
210			e.with_rql(rql.to_string());
211			outcome.error = Some(e);
212		}
213		if let Some(ref mut e) = outcome.error {
214			e.with_rql(rql.to_string());
215		}
216		outcome
217	}
218
219	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
220	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
221		let mut txn = match self.begin_query(identity) {
222			Ok(t) => t,
223			Err(mut e) => {
224				e.with_rql(rql.to_string());
225				return ExecutionResult {
226					frames: vec![],
227					error: Some(e),
228					metrics: ExecutionMetrics::default(),
229				};
230			}
231		};
232		let mut outcome = self.executor.query(
233			&mut txn,
234			Query {
235				rql,
236				params,
237			},
238		);
239		if let Some(ref mut e) = outcome.error {
240			e.with_rql(rql.to_string());
241		}
242		outcome
243	}
244
245	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
246	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
247		let mut txn = match self.begin_query(identity) {
248			Ok(t) => t,
249			Err(mut e) => {
250				e.with_rql(rql.to_string());
251				return ExecutionResult {
252					frames: vec![],
253					error: Some(e),
254					metrics: ExecutionMetrics::default(),
255				};
256			}
257		};
258		let mut outcome = self.executor.subscription(
259			&mut txn,
260			Subscription {
261				rql,
262				params,
263			},
264		);
265		if let Some(ref mut e) = outcome.error {
266			e.with_rql(rql.to_string());
267		}
268		outcome
269	}
270
271	/// Call a procedure by fully-qualified name.
272	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
273	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
274		if let Err(e) = self.reject_if_read_only() {
275			return ExecutionResult {
276				frames: vec![],
277				error: Some(e),
278				metrics: ExecutionMetrics::default(),
279			};
280		}
281		let mut txn = match self.begin_command(identity) {
282			Ok(t) => t,
283			Err(e) => {
284				return ExecutionResult {
285					frames: vec![],
286					error: Some(e),
287					metrics: ExecutionMetrics::default(),
288				};
289			}
290		};
291		let mut outcome = self.executor.call_procedure(&mut txn, name, &params);
292		if outcome.is_ok()
293			&& let Err(e) = txn.commit()
294		{
295			outcome.error = Some(e);
296		}
297		outcome
298	}
299
300	/// Register a user-defined virtual table.
301	///
302	/// The virtual table will be available for queries using the given namespace and name.
303	///
304	/// # Arguments
305	///
306	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
307	/// * `name` - The table name
308	/// * `table` - The virtual table implementation
309	///
310	/// # Returns
311	///
312	/// The assigned `VTableId` on success.
313	///
314	/// # Example
315	///
316	/// ```ignore
317	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumn};
318	/// use reifydb_type::value::r#type::Type;
319	/// use reifydb_core::value::Columns;
320	///
321	/// #[derive(Clone)]
322	/// struct MyTable;
323	///
324	/// impl UserVTable for MyTable {
325	///     fn definition(&self) -> Vec<UserVTableColumn> {
326	///         vec![UserVTableColumn::new("id", Type::Uint8)]
327	///     }
328	///     fn get(&self) -> Columns {
329	///         // Return column-oriented data
330	///         Columns::empty()
331	///     }
332	/// }
333	///
334	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
335	/// ```
336	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
337		let catalog = self.materialized_catalog();
338
339		// Look up namespace by name (use max u64 to get latest version)
340		let ns_def = catalog
341			.find_namespace_by_name(namespace)
342			.ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
343
344		// Allocate a new table ID
345		let table_id = self.executor.virtual_table_registry.allocate_id();
346		// Convert user column definitions to internal column definitions
347		let table_columns = table.vtable();
348		let columns = convert_vtable_user_columns_to_columns(&table_columns);
349
350		// Create the table definition
351		let def = Arc::new(VTable {
352			id: table_id,
353			namespace: ns_def.id(),
354			name: name.to_string(),
355			columns,
356		});
357
358		// Register in catalog (for resolver lookups)
359		catalog.register_vtable_user(def.clone())?;
360		// Create the data function from the UserVTable trait
361		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
362		// Create and register the entry
363		let entry = UserVTableEntry {
364			def: def.clone(),
365			data_fn,
366		};
367		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
368		Ok(table_id)
369	}
370}
371
372impl CdcHost for StandardEngine {
373	fn begin_command(&self) -> Result<CommandTransaction> {
374		StandardEngine::begin_command(self, IdentityId::system())
375	}
376
377	fn begin_query(&self) -> Result<QueryTransaction> {
378		StandardEngine::begin_query(self, IdentityId::system())
379	}
380
381	fn current_version(&self) -> Result<CommitVersion> {
382		StandardEngine::current_version(self)
383	}
384
385	fn done_until(&self) -> CommitVersion {
386		StandardEngine::done_until(self)
387	}
388
389	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
390		StandardEngine::wait_for_mark_timeout(self, version, timeout)
391	}
392
393	fn materialized_catalog(&self) -> &MaterializedCatalog {
394		&self.catalog.materialized
395	}
396}
397
398impl Clone for StandardEngine {
399	fn clone(&self) -> Self {
400		Self(self.0.clone())
401	}
402}
403
404impl Deref for StandardEngine {
405	type Target = Inner;
406
407	fn deref(&self) -> &Self::Target {
408		&self.0
409	}
410}
411
412pub struct Inner {
413	multi: MultiTransaction,
414	single: SingleTransaction,
415	event_bus: EventBus,
416	executor: Executor,
417	interceptors: Arc<InterceptorFactory>,
418	catalog: Catalog,
419	flow_operator_store: SystemFlowOperatorStore,
420	read_only: AtomicBool,
421}
422
423impl StandardEngine {
424	pub fn new(
425		multi: MultiTransaction,
426		single: SingleTransaction,
427		event_bus: EventBus,
428		interceptors: InterceptorFactory,
429		catalog: Catalog,
430		config: EngineConfig,
431	) -> Self {
432		let flow_operator_store = SystemFlowOperatorStore::new();
433		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
434		event_bus.register(listener);
435
436		// Get the metrics store from IoC to create the stats reader
437		let metrics_store = config
438			.ioc
439			.resolve::<SingleStore>()
440			.expect("SingleStore must be registered in IocContainer for metrics");
441		let stats_reader = MetricReader::new(metrics_store);
442
443		// Register MaterializedCatalogInterceptor as a factory function.
444		let materialized = catalog.materialized.clone();
445		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
446			interceptors
447				.post_commit
448				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
449		}));
450
451		let interceptors = Arc::new(interceptors);
452
453		Self(Arc::new(Inner {
454			multi,
455			single,
456			event_bus,
457			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
458			interceptors,
459			catalog,
460			flow_operator_store,
461			read_only: AtomicBool::new(false),
462		}))
463	}
464
465	/// Create a new set of interceptors from the factory.
466	pub fn create_interceptors(&self) -> Interceptors {
467		self.interceptors.create()
468	}
469
470	/// Register an additional interceptor factory function.
471	///
472	/// The function will be called on every `create()` to augment the base interceptors.
473	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
474	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
475		self.interceptors.add_late(factory);
476	}
477
478	/// Begin a query transaction at a specific version.
479	///
480	/// This is used for parallel query execution where multiple tasks need to
481	/// read from the same snapshot (same CommitVersion) for consistency.
482	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
483    ))]
484	pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
485		let mut txn = QueryTransaction::new(
486			self.multi.begin_query_at_version(version)?,
487			self.single.clone(),
488			identity,
489		);
490		txn.set_executor(Arc::new(self.executor.clone()));
491		Ok(txn)
492	}
493
494	#[inline]
495	pub fn multi(&self) -> &MultiTransaction {
496		&self.multi
497	}
498
499	#[inline]
500	pub fn multi_owned(&self) -> MultiTransaction {
501		self.multi.clone()
502	}
503
504	/// Get the actor system
505	#[inline]
506	pub fn actor_system(&self) -> ActorSystem {
507		self.multi.actor_system()
508	}
509
510	#[inline]
511	pub fn single(&self) -> &SingleTransaction {
512		&self.single
513	}
514
515	#[inline]
516	pub fn single_owned(&self) -> SingleTransaction {
517		self.single.clone()
518	}
519
520	#[inline]
521	pub fn emit<E: Event>(&self, event: E) {
522		self.event_bus.emit(event)
523	}
524
525	#[inline]
526	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
527		&self.catalog.materialized
528	}
529
530	/// Returns a `Catalog` instance for catalog lookups.
531	/// The Catalog provides three-tier lookup methods that check transactional changes,
532	/// then MaterializedCatalog, then fall back to storage.
533	#[inline]
534	pub fn catalog(&self) -> Catalog {
535		self.catalog.clone()
536	}
537
538	/// Returns the shared `Services` instance used by this engine's executor.
539	/// External consumers that want to drive volcano operators directly (e.g.
540	/// subsystems that build a `QueryContext`) read from the same `Services`
541	/// the engine already initialised - avoids duplicating the `Services::new`
542	/// wiring path.
543	#[inline]
544	pub fn services(&self) -> Arc<Services> {
545		self.executor.services().clone()
546	}
547
548	#[inline]
549	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
550		&self.flow_operator_store
551	}
552
553	/// Get the current version from the transaction manager
554	#[inline]
555	pub fn current_version(&self) -> Result<CommitVersion> {
556		self.multi.current_version()
557	}
558
559	/// Returns the highest version where ALL prior versions have completed.
560	/// This is useful for CDC polling to know the safe upper bound for fetching
561	/// CDC events - all events up to this version are guaranteed to be in storage.
562	#[inline]
563	pub fn done_until(&self) -> CommitVersion {
564		self.multi.done_until()
565	}
566
567	/// Wait for the watermark to reach the given version with a timeout.
568	/// Returns true if the watermark reached the target, false if timeout occurred.
569	#[inline]
570	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
571		self.multi.wait_for_mark_timeout(version, timeout)
572	}
573
574	#[inline]
575	pub fn executor(&self) -> Executor {
576		self.executor.clone()
577	}
578
579	/// Borrow the IoC container backing this engine. Used by callers that need
580	/// to resolve services registered during construction (e.g. observability
581	/// providers).
582	#[inline]
583	pub fn ioc(&self) -> &IocContainer {
584		&self.executor.ioc
585	}
586
587	/// Get the CDC store from the IoC container.
588	///
589	/// Returns the CdcStore that was registered during engine construction.
590	/// Panics if CdcStore was not registered.
591	#[inline]
592	pub fn cdc_store(&self) -> CdcStore {
593		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
594	}
595
596	/// Resolve an actor handle by message type.
597	///
598	/// Returns `None` if no actor for `M` was registered during engine
599	/// construction (e.g. the CDC compact actor is only registered for
600	/// persistent backends).
601	#[inline]
602	pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
603	where
604		ActorRef<M>: Send + Sync,
605	{
606		self.executor.ioc.try_resolve::<ActorRef<M>>()
607	}
608
609	/// Highest commit version processed by the CDC producer actor.
610	///
611	/// Once this returns `>= V`, every `PostCommitEvent` for versions `<= V`
612	/// has been fully handled by the producer, so any CDC row it was going
613	/// to write is in storage. Unlike `cdc_store().max_version()`, this
614	/// advances even for commits whose deltas were entirely filtered out by
615	/// `should_exclude_from_cdc` (e.g. ConfigStorage-only commits), so it is
616	/// the correct frontier for "producer is caught up to the engine".
617	#[inline]
618	pub fn cdc_producer_watermark(&self) -> CommitVersion {
619		self.executor
620			.ioc
621			.resolve::<CdcProducerWatermark>()
622			.expect("CdcProducerWatermark must be registered")
623			.get()
624	}
625
626	/// Mark this engine as read-only (replica mode).
627	/// Once set, all write-path methods will return ENG_007 immediately.
628	pub fn set_read_only(&self) {
629		self.read_only.store(true, Ordering::SeqCst);
630	}
631
632	/// Whether this engine is in read-only (replica) mode.
633	pub fn is_read_only(&self) -> bool {
634		self.read_only.load(Ordering::SeqCst)
635	}
636
637	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
638		if self.is_read_only() {
639			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
640		}
641		Ok(())
642	}
643
644	pub fn shutdown(&self) {
645		self.interceptors.clear_late();
646		self.executor.ioc.clear();
647	}
648
649	/// Start a bulk insert operation with full validation.
650	///
651	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
652	/// All inserts within a single builder execute in one transaction.
653	///
654	/// # Example
655	///
656	/// ```ignore
657	/// use reifydb_type::params;
658	///
659	/// engine.bulk_insert(&identity)
660	///     .table("namespace.users")
661	///         .row(params!{ id: 1, name: "Alice" })
662	///         .row(params!{ id: 2, name: "Bob" })
663	///         .done()
664	///     .execute()?;
665	/// ```
666	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
667		BulkInsertBuilder::new(self, identity)
668	}
669
670	/// Start a bulk insert that bypasses BOTH constraint validation AND the
671	/// oracle's per-key conflict-detection index ("unchecked" mode).
672	///
673	/// # What this skips beyond `bulk_insert`
674	///
675	/// `bulk_insert` (the validated default) performs full type/constraint
676	/// validation and registers the commit's write set in the oracle's
677	/// conflict-detection time-windows so that any concurrent OCC transaction
678	/// whose read set overlaps these writes will be aborted at its own commit
679	/// time.
680	///
681	/// `bulk_insert_unchecked` skips both. The commit version still advances
682	/// and the watermark still progresses, so any transaction that reads at
683	/// version >= this commit will observe the new rows. But concurrent OCC
684	/// transactions that already started reading at an older version will
685	/// NOT detect that this commit happened underneath them.
686	///
687	/// # Safety contract - when this is sound
688	///
689	/// Use this method ONLY when ALL of the following hold for the calling
690	/// context:
691	///
692	/// 1. **Single writer.** This commit is the only writer touching the rows it inserts. No other thread / process
693	///    / connection is writing to the same keys concurrently. (For chain ingest: the block-stream consumer is
694	///    the only writer, and blocks arrive in monotonic order.)
695	///
696	/// 2. **No concurrent OCC reader needs to be invalidated.** Any OCC transaction reading at an older version
697	///    will silently miss this commit's writes when computing its own conflict set. If your workload has
698	///    concurrent user transactions that read these rows, they will commit successfully despite a logical
699	///    conflict, and they will see stale data on retry. For trusted ingest where "downstream" readers are
700	///    streaming-view operators that consume each new commit on its own merits (not via OCC retry), this is
701	///    fine.
702	///
703	/// 3. **Caller-side well-formedness.** Validation is skipped, so primary key violations or constraint failures
704	///    will surface as storage errors at insert time rather than as transaction-level conflicts. The caller must
705	///    already ensure the data conforms to the table/ringbuffer shape.
706	///
707	/// 4. **No need to abort on overlap.** OCC normally aborts a writer whose read set was modified by a more
708	///    recent committer. Skipping the index means a concurrent OCC writer with an overlapping read set will
709	///    commit through. For trusted ingest where there is no competing OCC writer, this is irrelevant.
710	///
711	/// In short: safe for sequential, single-writer, append-mostly trusted
712	/// ingest where downstream readers don't rely on OCC abort-on-overlap.
713	/// Unsafe (silently incorrect) for any workload with concurrent OCC
714	/// transactions that read these keys and rely on conflict detection
715	/// for correctness.
716	pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
717		BulkInsertBuilder::new_unchecked(self, identity)
718	}
719}
720
721/// Convert user column definitions to internal Column format.
722fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
723	columns.iter()
724		.enumerate()
725		.map(|(idx, col)| {
726			// Note: For virtual tables, we use unconstrained for all types.
727			// The nullable field is still available for documentation purposes.
728			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
729			Column {
730				id: ColumnId(idx as u64),
731				name: col.name.clone(),
732				constraint,
733				properties: vec![],
734				index: ColumnIndex(idx as u8),
735				auto_increment: false,
736				dictionary_id: None,
737			}
738		})
739		.collect()
740}