import os
import shutil
import subprocess
import sys
import tempfile
import prollytree
def setup_example_repo():
tmpdir = tempfile.mkdtemp(prefix="prollytree_text_example_")
print(f"Created temporary directory: {tmpdir}")
subprocess.run(["git", "init"], cwd=tmpdir, check=True, capture_output=True)
subprocess.run(
["git", "config", "user.name", "Example User"],
cwd=tmpdir,
check=True,
capture_output=True,
)
subprocess.run(
["git", "config", "user.email", "user@example.com"],
cwd=tmpdir,
check=True,
capture_output=True,
)
dataset_dir = os.path.join(tmpdir, "dataset")
os.makedirs(dataset_dir, exist_ok=True)
return tmpdir, dataset_dir
def demo_basic_text_index():
print("\nDemo 1: Open + dual-write + search (with primary-tree lookup)")
print("=" * 60)
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.HashEmbedder(dim=64, seed=0)
store.text_index_open("personal", "docs", embedder)
docs = {
b"doc:1": "the quick brown fox jumps over the lazy dog",
b"doc:2": "rust is a systems programming language",
b"doc:3": "merkle trees enable verifiable data structures",
b"doc:4": "the fox and the hound are forest friends",
}
for doc_id, text in docs.items():
store.ns_insert("personal", doc_id, text.encode()) store.text_index_insert("personal", "docs", doc_id, text) store.commit("seed corpus + index")
print(f"Primary holds {len(store.ns_list_keys('personal'))} documents")
print(f"Index holds {store.text_index_len('personal', 'docs')} documents")
hits = store.text_index_search("personal", "docs", "the quick brown fox", k=2)
print(f"\nQuery: 'the quick brown fox' -> top {len(hits)}")
for doc_id, score in hits:
body = store.ns_get("personal", doc_id).decode()
print(f" {doc_id!r} distance={score:.4f} body={body!r}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def demo_cascade_mode():
print("\nDemo 2: Cascade — text index follows primary writes")
print("=" * 60)
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.HashEmbedder(dim=64, seed=0)
store.text_index_open("notes", "by_body", embedder)
store.set_cascade("notes", ["by_body"])
store.ns_insert("notes", b"note:1", b"meeting with the platform team")
store.ns_insert("notes", b"note:2", b"draft proposal for Q3 roadmap")
store.commit("cascade-driven indexing")
hits = store.text_index_search("notes", "by_body", "platform meeting", k=2)
print("Cascade-indexed search results:")
for doc_id, score in hits:
print(f" {doc_id!r} distance={score:.4f}")
store.ns_delete("notes", b"note:1")
store.commit("cascade-driven delete")
print(
f"\nAfter ns_delete('note:1'): index now holds "
f"{store.text_index_len('notes', 'by_body')} document(s)"
)
store.clear_cascade("notes")
print(f"Cascade cleared: {store.cascade_for_namespace('notes')}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def demo_multi_chunk_with_line_chunker():
print("\nDemo 3: Multi-chunk indexing via LineChunker")
print("=" * 60)
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.HashEmbedder(dim=64, seed=0)
store.text_index_open("logs", "by_line", embedder, chunker="line")
log = (
"2026-05-20T09:00 startup: loading config\n"
"2026-05-20T09:01 startup: bound port 8080\n"
"2026-05-20T09:42 error: database timeout after 30s\n"
"2026-05-20T09:43 retry: reconnecting to database\n"
"2026-05-20T09:43 recovery: database connection restored\n"
)
store.text_index_insert("logs", "by_line", b"log:2026-05-20", log)
store.commit("ingest log file")
print(
f"len = {store.text_index_len('logs', 'by_line')} document, "
f"chunk_count = {store.text_index_chunk_count('logs', 'by_line')} chunks"
)
hits = store.text_index_search("logs", "by_line", "database timeout", k=3)
print("Query: 'database timeout' (dedupped by document):")
for doc_id, score in hits:
print(f" {doc_id!r} distance={score:.4f}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def demo_audit_drift():
print("\nDemo 4: Drift detection via audit_text_index")
print("=" * 60)
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.HashEmbedder(dim=64, seed=0)
store.text_index_open("personal", "docs", embedder)
store.ns_insert("personal", b"doc:in-primary-only", b"only in the primary tree")
store.commit("primary write without indexing")
store.text_index_insert(
"personal", "docs", b"doc:in-index-only", "only in the index"
)
store.commit("index write without primary")
report = store.audit_text_index("personal", "docs")
print(f"is_in_sync : {report['is_in_sync']}")
print(f"orphans_in_index : {report['orphans_in_index']}")
print(f"missing_from_index: {report['missing_from_index']}")
removed = store.purge_text_index_orphans("personal", "docs")
print(f"\npurge_text_index_orphans removed {removed} orphan(s)")
store.commit("repair: purge orphans")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def demo_callable_embedder():
print("\nDemo 5: CallableEmbedder — bring your own embedding function")
print("=" * 60)
def toy_embed(text: str):
vec = [0.0] * 8
for i, ch in enumerate(text):
vec[i % 8] += float(ord(ch)) / 256.0
return vec
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.CallableEmbedder(
id="user:toy-char-sum",
version="v1",
dim=8,
embed_fn=toy_embed,
)
store.text_index_open("personal", "docs", embedder)
store.text_index_insert("personal", "docs", b"doc:a", "alpha document")
store.text_index_insert("personal", "docs", b"doc:b", "beta document")
store.commit("seed with custom embedder")
print("Embedder identity is persisted with the index:")
print(f" id = {embedder.id}")
print(f" version = {embedder.version}")
print(f" dim = {embedder.dim}")
hits = store.text_index_search("personal", "docs", "alpha document", k=1)
print(f"\nSearch 'alpha document' -> {hits!r}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def demo_minilm_embedder():
if not getattr(prollytree, "proximity_text_available", False):
print("\nDemo 6: MiniLmEmbedder (skipped — wheel built without proximity_text)")
return
print("\nDemo 6: MiniLmEmbedder — bundled Candle + all-MiniLM-L6-v2")
print("=" * 60)
print(
"First call downloads ~90 MB of weights into "
"$PROLLYTREE_EMBEDDER_CACHE (default ~/.cache/prollytree). "
"Subsequent runs reuse the cache."
)
tmpdir, dataset_dir = setup_example_repo()
try:
store = prollytree.NamespacedKvStore(dataset_dir)
embedder = prollytree.MiniLmEmbedder()
print(f"Embedder: {embedder!r}")
store.text_index_open("library", "books", embedder)
store.text_index_insert(
"library", "books", b"book:1", "a treatise on probabilistic data structures"
)
store.text_index_insert(
"library", "books", b"book:2", "introduction to systems programming in rust"
)
store.text_index_insert(
"library", "books", b"book:3", "the architecture of distributed databases"
)
store.commit("seed library")
hits = store.text_index_search(
"library", "books", "approximate nearest neighbour search", k=2
)
print("Semantic search results:")
for doc_id, score in hits:
print(f" {doc_id!r} distance={score:.4f}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def main():
print("Text indexing on NamespacedKvStore")
print("=" * 60)
proximity_available = getattr(prollytree, "proximity_available", False)
proximity_text_available = getattr(prollytree, "proximity_text_available", False)
print(f"proximity_available = {proximity_available}")
print(f"proximity_text_available = {proximity_text_available}")
if not proximity_available:
print("\nThe installed `prollytree` wheel was built without the 'proximity'")
print("feature. Rebuild and reinstall with one of:")
print(" ./python/build_python.sh --all-features --install")
print(" maturin develop --features 'python git proximity proximity_text'")
sys.exit(1)
try:
demo_basic_text_index()
demo_cascade_mode()
demo_multi_chunk_with_line_chunker()
demo_audit_drift()
demo_callable_embedder()
demo_minilm_embedder()
print("\nAll demos completed successfully.")
print("\nKey takeaways:")
print("- Primary KV tree is the source of truth; text index stores only")
print(" (id, vector) pairs. Write document bodies to BOTH (demo 1) or")
print(" use set_cascade to auto-mirror primary writes into the index (demo 2).")
print("- text_index_open(ns, idx, embedder, chunker) creates or re-opens.")
print("- chunker='line' splits one document into many chunks; search dedups.")
print("- audit_text_index + purge_text_index_orphans repair drift.")
print("- HashEmbedder for tests; CallableEmbedder for any Python function;")
print(" MiniLmEmbedder for bundled semantic search.")
except KeyboardInterrupt:
print("\nExample interrupted by user.")
sys.exit(1)
except Exception as exc: print(f"\nExample failed: {exc}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()