---
source: crates/nautilus-codegen/tests/snapshot_tests.rs
expression: "&client_sync"
---
"""Generated Nautilus client with model delegates.
This file is auto-generated by nautilus-codegen.
Do not edit manually.
"""
from __future__ import annotations
from typing import Awaitable, Callable, TypeVar
from ._internal.client import NautilusClient as BaseClient, _AsyncTransactionContext, _SyncTransactionContext
from ._internal.transaction import IsolationLevel
T = TypeVar("T")
from .models.post import PostDelegate
from .models.user import UserDelegate
_SCHEMA_PATH = "schema.nautilus"
class Nautilus(BaseClient):
"""Nautilus client with generated model delegates."""
def __init__(self, migrate: bool = False, auto_register: bool = False) -> None:
"""Initialize the Nautilus client.
Args:
migrate: If True, run DDL migrations on engine startup.
auto_register: If True, register globally for Model.nautilus access.
"""
super().__init__(_SCHEMA_PATH, migrate=migrate, auto_register=auto_register)
self.post = PostDelegate(self)
self.register_delegate("post", self.post)
self.user = UserDelegate(self)
self.register_delegate("user", self.user)
def transaction(
self,
callback: Callable[["Nautilus"], Awaitable[T]] | None = None,
*,
timeout_ms: int = 5000,
isolation_level: IsolationLevel | None = None,
) -> "_AsyncTransactionContext[Nautilus]":
"""Run operations inside a database transaction.
Can be used as an **async context manager** *or* as a **callback-style
transaction**.
Context-manager (interactive) usage::
async with client.transaction() as tx:
user = await tx.user.create({"name": "Alice"})
await tx.post.create({"title": "Hello", "authorId": user.id})
Callback usage::
async def work(tx):
user = await tx.user.create({"name": "Bob"})
return user
user = await client.transaction(work, timeout_ms=10000)
Args:
callback: Optional async callable receiving a ``TransactionClient``.
If provided the transaction is committed on success and
rolled back on error; the return value of *callback* is
returned.
timeout_ms: Server-side transaction timeout (default 5 000 ms).
isolation_level: Optional SQL isolation level.
Returns:
When used with *callback*: an awaitable coroutine.
When used as a context manager: an ``_AsyncTransactionContext``.
"""
return super().transaction(callback, timeout_ms=timeout_ms, isolation_level=isolation_level) # type: ignore[return-value]
def connect(self) -> None:
"""Connect to the engine (synchronous)."""
self._sync_connect()
def disconnect(self) -> None:
"""Disconnect from the engine (synchronous)."""
self._sync_disconnect()
def __enter__(self) -> "Nautilus":
"""Synchronous context manager entry — connects to the engine."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Synchronous context manager exit — disconnects from the engine."""
self.disconnect()
def sync_transaction(
self,
callback: Callable[["Nautilus"], T] | None = None,
*,
timeout_ms: int = 5000,
isolation_level: IsolationLevel | None = None,
) -> "_SyncTransactionContext[Nautilus]":
"""Start a sync transaction, returning a typed context manager."""
return super().sync_transaction(callback, timeout_ms=timeout_ms, isolation_level=isolation_level) # type: ignore[return-value]