briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
Pinecone vector database adapter with lakeFS versioning.
"""

from typing import List, Dict, Any, Optional
import logging

try:
    from opentelemetry import trace
    HAS_OTEL = True
except ImportError:
    HAS_OTEL = False

logger = logging.getLogger(__name__)


class VersionedPineconeStore:
    """
    Pinecone vector store with lakeFS version tracking.

    Automatically attaches version metadata to all vectors for
    complete provenance tracking.
    """

    def __init__(
        self,
        index_name: str,
        lakefs_repository: str,
        lakefs_commit: str,
        api_key: Optional[str] = None,
        environment: Optional[str] = None,
        namespace: Optional[str] = None
    ):
        self.index_name = index_name
        self.lakefs_repository = lakefs_repository
        self.lakefs_commit = lakefs_commit
        self.namespace = namespace or "default"

        # Try to import and initialize Pinecone
        try:
            import pinecone
            if api_key and environment:
                pinecone.init(api_key=api_key, environment=environment)
                self.index = pinecone.Index(index_name)
                self._has_pinecone = True
            else:
                logger.warning("Pinecone credentials not provided, using mock mode")
                self.index = None
                self._has_pinecone = False
        except ImportError:
            logger.warning("Pinecone not installed, using mock mode")
            self.index = None
            self._has_pinecone = False
        except Exception as e:
            logger.error(f"Failed to initialize Pinecone: {e}")
            self.index = None
            self._has_pinecone = False

        if HAS_OTEL:
            self._tracer = trace.get_tracer(__name__)
        else:
            self._tracer = None

    def upsert_batch(
        self,
        embeddings: List[List[float]],
        document_ids: List[str],
        metadata: List[Dict[str, Any]]
    ) -> Dict[str, int]:
        """
        Upsert embeddings with version metadata.

        Args:
            embeddings: List of embedding vectors
            document_ids: List of document IDs
            metadata: List of metadata dicts (one per document)

        Returns:
            Dict with upsert statistics
        """
        if self._tracer and HAS_OTEL:
            span = self._tracer.start_span("rag.vector_store.upsert")
        else:
            span = None

        try:
            if span:
                span.set_attribute("rag.vector_store.type", "pinecone")
                span.set_attribute("rag.vector_store.upsert_count", len(embeddings))
                span.set_attribute("rag.index.name", self.index_name)
                span.set_attribute("rag.index.version", self.lakefs_commit)

            # Enrich metadata with lakeFS version
            enriched_metadata = []
            for meta in metadata:
                meta_copy = meta.copy()
                meta_copy['lakefs_repository'] = self.lakefs_repository
                meta_copy['lakefs_commit'] = self.lakefs_commit
                meta_copy['lakefs_namespace'] = self.namespace
                enriched_metadata.append(meta_copy)

            if not self._has_pinecone or not self.index:
                # Mock mode
                logger.info(f"Mock mode: Would upsert {len(embeddings)} vectors to Pinecone")
                return {"upserted_count": len(embeddings)}

            # Upsert to Pinecone
            vectors = [
                (doc_id, embedding, meta)
                for doc_id, embedding, meta in zip(document_ids, embeddings, enriched_metadata)
            ]

            response = self.index.upsert(
                vectors=vectors,
                namespace=self.namespace
            )

            return {"upserted_count": response.upserted_count if hasattr(response, 'upserted_count') else len(vectors)}

        finally:
            if span:
                span.end()

    def query(
        self,
        query_embedding: List[float],
        top_k: int = 5,
        filter: Optional[Dict[str, Any]] = None,
        include_metadata: bool = True
    ) -> List[Dict[str, Any]]:
        """
        Query with version filtering.

        Args:
            query_embedding: Query vector
            top_k: Number of results to return
            filter: Additional filters
            include_metadata: Whether to include metadata

        Returns:
            List of matches with scores and metadata
        """
        if self._tracer and HAS_OTEL:
            span = self._tracer.start_span("rag.vector_store.query")
        else:
            span = None

        try:
            if span:
                span.set_attribute("rag.vector_store.type", "pinecone")
                span.set_attribute("rag.vector_store.query_top_k", top_k)
                span.set_attribute("rag.index.version", self.lakefs_commit)

            # Add version filter
            if filter is None:
                filter = {}
            filter['lakefs_commit'] = self.lakefs_commit

            if not self._has_pinecone or not self.index:
                # Mock mode
                logger.info(f"Mock mode: Would query Pinecone with top_k={top_k}")
                return [
                    {
                        "id": f"mock_doc_{i}",
                        "score": 0.95 - (i * 0.05),
                        "metadata": {
                            "lakefs_commit": self.lakefs_commit,
                            "lakefs_repository": self.lakefs_repository
                        }
                    }
                    for i in range(min(top_k, 3))
                ]

            # Query Pinecone
            results = self.index.query(
                vector=query_embedding,
                top_k=top_k,
                filter=filter,
                include_metadata=include_metadata,
                namespace=self.namespace
            )

            matches = results.get('matches', [])

            if span:
                span.set_attribute("rag.retrieval.count", len(matches))

            return [
                {
                    "id": match.id,
                    "score": match.score,
                    "metadata": match.metadata if hasattr(match, 'metadata') else {}
                }
                for match in matches
            ]

        finally:
            if span:
                span.end()

    def delete(self, ids: List[str]) -> Dict[str, int]:
        """Delete vectors by IDs."""
        if not self._has_pinecone or not self.index:
            logger.info(f"Mock mode: Would delete {len(ids)} vectors from Pinecone")
            return {"deleted_count": len(ids)}

        self.index.delete(ids=ids, namespace=self.namespace)
        return {"deleted_count": len(ids)}

    def describe_index_stats(self) -> Dict[str, Any]:
        """Get index statistics."""
        if not self._has_pinecone or not self.index:
            return {
                "total_vector_count": 0,
                "dimension": 3072,
                "index_fullness": 0.0
            }

        stats = self.index.describe_index_stats()
        return {
            "total_vector_count": stats.total_vector_count,
            "dimension": stats.dimension,
            "index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else 0.0
        }