Skip to main content

uni_db/api/
sync.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::api::Uni;
5use crate::api::locy_builder::{LocyBuilder, TxLocyBuilder};
6use crate::api::session::{QueryBuilder, Session, TransactionBuilder};
7use crate::api::transaction::{
8    ApplyBuilder, ApplyResult, CommitResult, Transaction, TxQueryBuilder,
9};
10use std::sync::Arc;
11use uni_common::core::schema::{DataType, Schema};
12use uni_common::{Result, UniError, Value};
13use uni_locy::DerivedFactSet;
14
15use crate::api::locy_result::LocyResult;
16use uni_query::{ExecuteResult, QueryResult, Row};
17
18/// Blocking API wrapper for Uni.
19pub struct UniSync {
20    inner: Option<Uni>,
21    rt: tokio::runtime::Runtime,
22}
23
24impl UniSync {
25    pub fn new(inner: Uni) -> Result<Self> {
26        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
27        Ok(Self {
28            inner: Some(inner),
29            rt,
30        })
31    }
32
33    /// Open an in-memory database (blocking)
34    pub fn in_memory() -> Result<Self> {
35        let rt = tokio::runtime::Runtime::new().map_err(UniError::Io)?;
36        let inner = rt.block_on(Uni::in_memory().build())?;
37        Ok(Self {
38            inner: Some(inner),
39            rt,
40        })
41    }
42
43    fn inner(&self) -> &Uni {
44        self.inner.as_ref().expect("UniSync already shut down")
45    }
46
47    /// Create a new session (sync wrapper).
48    pub fn session(&self) -> SessionSync<'_> {
49        SessionSync {
50            session: self.inner().session(),
51            rt: &self.rt,
52        }
53    }
54
55    pub fn schema_meta(&self) -> Arc<Schema> {
56        self.inner().schema().current()
57    }
58
59    pub fn schema(&self) -> SchemaBuilderSync<'_> {
60        SchemaBuilderSync {
61            inner: self.inner().schema(),
62            rt: &self.rt,
63        }
64    }
65
66    /// Shutdown the database gracefully (blocking).
67    ///
68    /// Note: This consumes self, which prevents the Drop impl from also
69    /// triggering shutdown. Use this for explicit shutdown with error handling.
70    pub fn shutdown(mut self) -> Result<()> {
71        // Take ownership of the inner Uni to prevent Drop from also running
72        if let Some(uni) = self.inner.take() {
73            let result = self.rt.block_on(uni.shutdown());
74
75            // Prevent Drop from running by forgetting self
76            // (we've already done the cleanup in the async shutdown)
77            std::mem::forget(self);
78
79            result
80        } else {
81            Ok(()) // Already shut down
82        }
83    }
84}
85
86impl Drop for UniSync {
87    fn drop(&mut self) {
88        if let Some(ref uni) = self.inner {
89            uni.inner.shutdown_handle.shutdown_blocking();
90            tracing::debug!("UniSync dropped");
91        }
92    }
93}
94
95// ── SessionSync ──────────────────────────────────────────────────────────
96
97/// Blocking wrapper around [`Session`].
98///
99/// All async methods on `Session` are wrapped with `block_on()` to provide
100/// a synchronous API. Created via [`UniSync::session()`].
101pub struct SessionSync<'a> {
102    session: Session,
103    rt: &'a tokio::runtime::Runtime,
104}
105
106impl<'a> SessionSync<'a> {
107    // ── Cypher Reads ──────────────────────────────────────────────────
108
109    /// Execute a read-only Cypher query.
110    pub fn query(&self, cypher: &str) -> Result<QueryResult> {
111        self.rt.block_on(self.session.query(cypher))
112    }
113
114    /// Execute a read-only Cypher query with a builder for parameters.
115    pub fn query_with<'s>(&'s self, cypher: &str) -> QueryBuilderSync<'s, 'a> {
116        QueryBuilderSync {
117            inner: self.session.query_with(cypher),
118            rt: self.rt,
119        }
120    }
121
122    // ── Locy Evaluation ───────────────────────────────────────────────
123
124    /// Evaluate a Locy program with default configuration.
125    pub fn locy(&self, program: &str) -> Result<LocyResult> {
126        self.rt.block_on(self.session.locy(program))
127    }
128
129    /// Evaluate a Locy program with parameters using a builder.
130    pub fn locy_with<'s>(&'s self, program: &str) -> LocyBuilderSync<'s, 'a> {
131        LocyBuilderSync {
132            inner: self.session.locy_with(program),
133            rt: self.rt,
134        }
135    }
136
137    // ── Rule Management ───────────────────────────────────────────────
138
139    /// Access the session-scoped rule registry.
140    pub fn rules(&self) -> crate::api::rule_registry::RuleRegistry<'_> {
141        self.session.rules()
142    }
143
144    /// Compile a Locy program without executing it.
145    pub fn compile_locy(&self, program: &str) -> Result<uni_locy::CompiledProgram> {
146        self.session.compile_locy(program)
147    }
148
149    // ── Custom Functions ──────────────────────────────────────────────
150
151    // ── Transactions ──────────────────────────────────────────────────
152
153    /// Create a new transaction for multi-statement writes.
154    pub fn tx(&self) -> Result<TransactionSync<'a>> {
155        let tx = self.rt.block_on(self.session.tx())?;
156        Ok(TransactionSync { tx, rt: self.rt })
157    }
158
159    /// Create a transaction with builder options (timeout, isolation level).
160    pub fn tx_with(&self) -> TransactionBuilderSync<'_, 'a> {
161        TransactionBuilderSync {
162            inner: self.session.tx_with(),
163            rt: self.rt,
164        }
165    }
166
167    // ── Commit Notifications ─────────────────────────────────────────
168
169    /// Watch for all commit notifications.
170    pub fn watch(&self) -> crate::api::notifications::CommitStream {
171        self.session.watch()
172    }
173
174    /// Watch for commit notifications with filters.
175    pub fn watch_with(&self) -> crate::api::notifications::WatchBuilder {
176        self.session.watch_with()
177    }
178
179    // ── Hooks ─────────────────────────────────────────────────────────
180
181    /// Add a named session hook for query/commit interception.
182    pub fn add_hook(
183        &mut self,
184        name: impl Into<String>,
185        hook: impl crate::api::hooks::SessionHook + 'static,
186    ) {
187        self.session.add_hook(name, hook)
188    }
189
190    /// Remove a hook by name. Returns true if it existed.
191    pub fn remove_hook(&mut self, name: &str) -> bool {
192        self.session.remove_hook(name)
193    }
194
195    /// List names of all registered hooks.
196    pub fn list_hooks(&self) -> Vec<String> {
197        self.session.list_hooks()
198    }
199
200    /// Remove all hooks.
201    pub fn clear_hooks(&mut self) {
202        self.session.clear_hooks()
203    }
204
205    // ── Version Pinning ──────────────────────────────────────────────
206
207    /// Pin this session to a specific snapshot version.
208    pub fn pin_to_version(&mut self, snapshot_id: &str) -> Result<()> {
209        self.rt.block_on(self.session.pin_to_version(snapshot_id))
210    }
211
212    /// Pin this session to a specific timestamp.
213    pub fn pin_to_timestamp(&mut self, ts: chrono::DateTime<chrono::Utc>) -> Result<()> {
214        self.rt.block_on(self.session.pin_to_timestamp(ts))
215    }
216
217    /// Unpin the session, returning to the live database state.
218    pub fn refresh(&mut self) -> Result<()> {
219        self.rt.block_on(self.session.refresh())
220    }
221
222    // ── Prepared Statements ──────────────────────────────────────────
223
224    /// Prepare a Cypher query for repeated execution.
225    pub fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
226        self.rt.block_on(self.session.prepare(cypher))
227    }
228
229    /// Prepare a Locy program for repeated evaluation.
230    pub fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
231        self.rt.block_on(self.session.prepare_locy(program))
232    }
233
234    // ── Scoped Parameters ─────────────────────────────────────────────
235
236    /// Access the session-scoped parameter store.
237    pub fn params(&self) -> crate::api::session::Params<'_> {
238        self.session.params()
239    }
240
241    // ── Lifecycle & Observability ─────────────────────────────────────
242
243    /// Get the session ID.
244    pub fn id(&self) -> &str {
245        self.session.id()
246    }
247
248    /// Query the capabilities of this session.
249    pub fn capabilities(&self) -> crate::api::session::SessionCapabilities {
250        self.session.capabilities()
251    }
252
253    /// Snapshot the session's accumulated metrics.
254    pub fn metrics(&self) -> crate::api::session::SessionMetrics {
255        self.session.metrics()
256    }
257
258    /// Cancel all in-flight queries in this session.
259    pub fn cancel(&self) {
260        self.session.cancel()
261    }
262}
263
264// ── QueryBuilderSync ──────────────────────────────────────────────
265
266/// Blocking wrapper around [`QueryBuilder`].
267pub struct QueryBuilderSync<'s, 'a> {
268    inner: QueryBuilder<'s>,
269    rt: &'a tokio::runtime::Runtime,
270}
271
272impl<'s, 'a> QueryBuilderSync<'s, 'a> {
273    /// Bind a parameter to the query.
274    pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
275        self.inner = self.inner.param(key, value);
276        self
277    }
278
279    /// Bind multiple parameters from an iterator.
280    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
281        self.inner = self.inner.params(params);
282        self
283    }
284
285    /// Set maximum execution time for this query.
286    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
287        self.inner = self.inner.timeout(duration);
288        self
289    }
290
291    /// Set maximum memory per query in bytes.
292    pub fn max_memory(mut self, bytes: usize) -> Self {
293        self.inner = self.inner.max_memory(bytes);
294        self
295    }
296
297    /// Execute the query and fetch all results.
298    pub fn fetch_all(self) -> Result<QueryResult> {
299        self.rt.block_on(self.inner.fetch_all())
300    }
301
302    /// Execute the query and return the first row, or `None` if empty.
303    pub fn fetch_one(self) -> Result<Option<Row>> {
304        self.rt.block_on(self.inner.fetch_one())
305    }
306}
307
308// ── LocyBuilderSync ──────────────────────────────────────────────
309
310/// Blocking wrapper around [`LocyBuilder`].
311pub struct LocyBuilderSync<'s, 'a> {
312    inner: LocyBuilder<'s>,
313    rt: &'a tokio::runtime::Runtime,
314}
315
316impl<'s, 'a> LocyBuilderSync<'s, 'a> {
317    /// Bind a single parameter.
318    pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
319        self.inner = self.inner.param(name, value);
320        self
321    }
322
323    /// Bind multiple parameters from an iterator.
324    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
325        self.inner = self.inner.params(params);
326        self
327    }
328
329    /// Override the evaluation timeout.
330    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
331        self.inner = self.inner.timeout(duration);
332        self
333    }
334
335    /// Override the maximum fixpoint iteration count.
336    pub fn max_iterations(mut self, n: usize) -> Self {
337        self.inner = self.inner.max_iterations(n);
338        self
339    }
340
341    /// Apply a fully configured [`LocyConfig`](uni_locy::LocyConfig).
342    pub fn with_config(mut self, config: uni_locy::LocyConfig) -> Self {
343        self.inner = self.inner.with_config(config);
344        self
345    }
346
347    /// Evaluate the program and return the full [`LocyResult`].
348    pub fn run(self) -> Result<LocyResult> {
349        self.rt.block_on(self.inner.run())
350    }
351}
352
353// ── TransactionSync ──────────────────────────────────────────────────────
354
355pub struct TransactionSync<'a> {
356    tx: Transaction,
357    rt: &'a tokio::runtime::Runtime,
358}
359
360impl<'a> TransactionSync<'a> {
361    pub fn query(&self, cypher: &str) -> Result<QueryResult> {
362        self.rt.block_on(self.tx.query(cypher))
363    }
364
365    /// Execute a Cypher query with parameters using a builder.
366    pub fn query_with<'t>(&'t self, cypher: &str) -> TxQueryBuilderSync<'t, 'a> {
367        TxQueryBuilderSync {
368            inner: self.tx.query_with(cypher),
369            rt: self.rt,
370        }
371    }
372
373    pub fn execute(&self, cypher: &str) -> Result<ExecuteResult> {
374        self.rt.block_on(self.tx.execute(cypher))
375    }
376
377    /// Execute a mutation with parameters using a builder.
378    pub fn execute_with<'t>(&'t self, cypher: &str) -> ExecuteBuilderSync<'t, 'a> {
379        ExecuteBuilderSync {
380            inner: self.tx.execute_with(cypher),
381            rt: self.rt,
382        }
383    }
384
385    /// Evaluate a Locy program within the transaction.
386    pub fn locy(&self, program: &str) -> Result<LocyResult> {
387        self.rt.block_on(self.tx.locy(program))
388    }
389
390    /// Evaluate a Locy program with parameters using a builder.
391    pub fn locy_with<'t>(&'t self, program: &str) -> TxLocyBuilderSync<'t, 'a> {
392        TxLocyBuilderSync {
393            inner: self.tx.locy_with(program),
394            rt: self.rt,
395        }
396    }
397
398    /// Apply a `DerivedFactSet` to this transaction.
399    pub fn apply(&self, derived: DerivedFactSet) -> Result<ApplyResult> {
400        self.rt.block_on(self.tx.apply(derived))
401    }
402
403    /// Apply a `DerivedFactSet` with staleness controls.
404    pub fn apply_with(&self, derived: DerivedFactSet) -> ApplyBuilderSync<'_, 'a> {
405        ApplyBuilderSync {
406            inner: self.tx.apply_with(derived),
407            rt: self.rt,
408        }
409    }
410
411    /// Prepare a Cypher query for repeated execution.
412    pub fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
413        self.rt.block_on(self.tx.prepare(cypher))
414    }
415
416    /// Prepare a Locy program for repeated evaluation.
417    pub fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
418        self.rt.block_on(self.tx.prepare_locy(program))
419    }
420
421    pub fn commit(self) -> Result<CommitResult> {
422        self.rt.block_on(self.tx.commit())
423    }
424
425    pub fn rollback(self) {
426        self.tx.rollback()
427    }
428
429    /// Create a bulk writer builder for efficient data loading.
430    pub fn bulk_writer(&self) -> crate::api::bulk::BulkWriterBuilder {
431        self.tx.bulk_writer()
432    }
433
434    /// Create a streaming appender for row-by-row data loading.
435    pub fn appender(&self, label: &str) -> crate::api::appender::AppenderBuilder {
436        self.tx.appender(label)
437    }
438
439    /// Bulk insert vertices within this transaction.
440    pub fn bulk_insert_vertices(
441        &self,
442        label: &str,
443        properties_list: Vec<uni_common::Properties>,
444    ) -> Result<Vec<uni_common::core::id::Vid>> {
445        self.rt
446            .block_on(self.tx.bulk_insert_vertices(label, properties_list))
447    }
448
449    /// Bulk insert edges within this transaction.
450    pub fn bulk_insert_edges(
451        &self,
452        edge_type: &str,
453        edges: Vec<(
454            uni_common::core::id::Vid,
455            uni_common::core::id::Vid,
456            uni_common::Properties,
457        )>,
458    ) -> Result<()> {
459        self.rt
460            .block_on(self.tx.bulk_insert_edges(edge_type, edges))
461    }
462
463    /// Check if the transaction has uncommitted changes.
464    pub fn is_dirty(&self) -> bool {
465        self.tx.is_dirty()
466    }
467
468    /// Get the transaction ID.
469    pub fn id(&self) -> &str {
470        self.tx.id()
471    }
472}
473
474// ── ExecuteBuilderSync ──────────────────────────────────────────────────
475
476/// Blocking wrapper around [`ExecuteBuilder`](crate::api::transaction::ExecuteBuilder).
477pub struct ExecuteBuilderSync<'t, 'a> {
478    inner: crate::api::transaction::ExecuteBuilder<'t>,
479    rt: &'a tokio::runtime::Runtime,
480}
481
482impl<'t, 'a> ExecuteBuilderSync<'t, 'a> {
483    /// Bind a parameter to the mutation.
484    pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
485        self.inner = self.inner.param(key, value);
486        self
487    }
488
489    /// Bind multiple parameters from an iterator.
490    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
491        self.inner = self.inner.params(params);
492        self
493    }
494
495    /// Set maximum execution time for this mutation.
496    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
497        self.inner = self.inner.timeout(duration);
498        self
499    }
500
501    /// Execute the mutation and return affected row count with detailed stats.
502    pub fn run(self) -> Result<ExecuteResult> {
503        self.rt.block_on(self.inner.run())
504    }
505}
506
507// ── TransactionBuilderSync ───────────────────────────────────────────────
508
509/// Blocking wrapper around [`TransactionBuilder`].
510pub struct TransactionBuilderSync<'s, 'a> {
511    inner: TransactionBuilder<'s>,
512    rt: &'a tokio::runtime::Runtime,
513}
514
515impl<'s, 'a> TransactionBuilderSync<'s, 'a> {
516    /// Set the transaction timeout.
517    pub fn timeout(mut self, d: std::time::Duration) -> Self {
518        self.inner = self.inner.timeout(d);
519        self
520    }
521
522    /// Set the isolation level.
523    pub fn isolation(mut self, level: crate::api::transaction::IsolationLevel) -> Self {
524        self.inner = self.inner.isolation(level);
525        self
526    }
527
528    /// Start the transaction.
529    pub fn start(self) -> Result<TransactionSync<'a>> {
530        let tx = self.rt.block_on(self.inner.start())?;
531        Ok(TransactionSync { tx, rt: self.rt })
532    }
533}
534
535// ── TxQueryBuilderSync ─────────────────────────────────────────
536
537/// Blocking wrapper around [`TxQueryBuilder`].
538pub struct TxQueryBuilderSync<'t, 'a> {
539    inner: TxQueryBuilder<'t>,
540    rt: &'a tokio::runtime::Runtime,
541}
542
543impl<'t, 'a> TxQueryBuilderSync<'t, 'a> {
544    /// Bind a parameter.
545    pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
546        self.inner = self.inner.param(name, value);
547        self
548    }
549
550    /// Execute a mutation and return affected row count with detailed stats.
551    pub fn execute(self) -> Result<ExecuteResult> {
552        self.rt.block_on(self.inner.execute())
553    }
554
555    /// Execute as a query and return rows.
556    pub fn fetch_all(self) -> Result<QueryResult> {
557        self.rt.block_on(self.inner.fetch_all())
558    }
559
560    /// Execute the query and return the first row, or `None` if empty.
561    pub fn fetch_one(self) -> Result<Option<Row>> {
562        self.rt.block_on(self.inner.fetch_one())
563    }
564}
565
566// ── ApplyBuilderSync ────────────────────────────────────────────────────
567
568/// Blocking wrapper around [`ApplyBuilder`].
569pub struct ApplyBuilderSync<'t, 'a> {
570    inner: ApplyBuilder<'t>,
571    rt: &'a tokio::runtime::Runtime,
572}
573
574impl<'t, 'a> ApplyBuilderSync<'t, 'a> {
575    /// Require that no commits occurred between DERIVE evaluation and apply.
576    pub fn require_fresh(mut self) -> Self {
577        self.inner = self.inner.require_fresh();
578        self
579    }
580
581    /// Allow up to `n` versions of gap.
582    pub fn max_version_gap(mut self, n: u64) -> Self {
583        self.inner = self.inner.max_version_gap(n);
584        self
585    }
586
587    /// Execute the apply operation.
588    pub fn run(self) -> Result<ApplyResult> {
589        self.rt.block_on(self.inner.run())
590    }
591}
592
593// ── TxLocyBuilderSync ──────────────────────────────────────────
594
595/// Blocking wrapper around [`TxLocyBuilder`].
596pub struct TxLocyBuilderSync<'t, 'a> {
597    inner: TxLocyBuilder<'t>,
598    rt: &'a tokio::runtime::Runtime,
599}
600
601impl<'t, 'a> TxLocyBuilderSync<'t, 'a> {
602    /// Bind a single parameter.
603    pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
604        self.inner = self.inner.param(name, value);
605        self
606    }
607
608    /// Bind multiple parameters from an iterator.
609    pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
610        self.inner = self.inner.params(params);
611        self
612    }
613
614    /// Override the evaluation timeout.
615    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
616        self.inner = self.inner.timeout(duration);
617        self
618    }
619
620    /// Override the maximum fixpoint iteration count.
621    pub fn max_iterations(mut self, n: usize) -> Self {
622        self.inner = self.inner.max_iterations(n);
623        self
624    }
625
626    /// Apply a fully configured [`LocyConfig`](uni_locy::LocyConfig).
627    pub fn with_config(mut self, config: uni_locy::LocyConfig) -> Self {
628        self.inner = self.inner.with_config(config);
629        self
630    }
631
632    /// Evaluate the program and return the full [`LocyResult`].
633    pub fn run(self) -> Result<LocyResult> {
634        self.rt.block_on(self.inner.run())
635    }
636}
637
638// ── Schema Builders (unchanged) ──────────────────────────────────────────
639
640pub struct SchemaBuilderSync<'a> {
641    inner: crate::api::schema::SchemaBuilder<'a>,
642    rt: &'a tokio::runtime::Runtime,
643}
644
645impl<'a> SchemaBuilderSync<'a> {
646    pub fn label(self, name: &str) -> LabelBuilderSync<'a> {
647        LabelBuilderSync {
648            inner: self.inner.label(name),
649            rt: self.rt,
650        }
651    }
652
653    pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilderSync<'a> {
654        EdgeTypeBuilderSync {
655            inner: self.inner.edge_type(name, from, to),
656            rt: self.rt,
657        }
658    }
659
660    pub fn apply(self) -> Result<()> {
661        self.rt.block_on(self.inner.apply())
662    }
663}
664
665pub struct LabelBuilderSync<'a> {
666    inner: crate::api::schema::LabelBuilder<'a>,
667    rt: &'a tokio::runtime::Runtime,
668}
669
670impl<'a> LabelBuilderSync<'a> {
671    pub fn property(mut self, name: &str, data_type: DataType) -> Self {
672        self.inner = self.inner.property(name, data_type);
673        self
674    }
675
676    pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
677        self.inner = self.inner.property_nullable(name, data_type);
678        self
679    }
680
681    pub fn vector(mut self, name: &str, dimensions: usize) -> Self {
682        self.inner = self.inner.vector(name, dimensions);
683        self
684    }
685
686    pub fn done(self) -> SchemaBuilderSync<'a> {
687        SchemaBuilderSync {
688            inner: self.inner.done(),
689            rt: self.rt,
690        }
691    }
692
693    pub fn label(self, name: &str) -> LabelBuilderSync<'a> {
694        self.done().label(name)
695    }
696
697    pub fn apply(self) -> Result<()> {
698        self.rt.block_on(self.inner.apply())
699    }
700}
701
702pub struct EdgeTypeBuilderSync<'a> {
703    inner: crate::api::schema::EdgeTypeBuilder<'a>,
704    rt: &'a tokio::runtime::Runtime,
705}
706
707impl<'a> EdgeTypeBuilderSync<'a> {
708    pub fn property(mut self, name: &str, data_type: DataType) -> Self {
709        self.inner = self.inner.property(name, data_type);
710        self
711    }
712
713    pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
714        self.inner = self.inner.property_nullable(name, data_type);
715        self
716    }
717
718    pub fn done(self) -> SchemaBuilderSync<'a> {
719        SchemaBuilderSync {
720            inner: self.inner.done(),
721            rt: self.rt,
722        }
723    }
724
725    pub fn apply(self) -> Result<()> {
726        self.rt.block_on(self.inner.apply())
727    }
728}