from typing import List, Dict, Any, Optional
import logging
try:
from opentelemetry import trace
HAS_OTEL = True
except ImportError:
HAS_OTEL = False
logger = logging.getLogger(__name__)
class VersionedPineconeStore:
def __init__(
self,
index_name: str,
lakefs_repository: str,
lakefs_commit: str,
api_key: Optional[str] = None,
environment: Optional[str] = None,
namespace: Optional[str] = None
):
self.index_name = index_name
self.lakefs_repository = lakefs_repository
self.lakefs_commit = lakefs_commit
self.namespace = namespace or "default"
try:
import pinecone
if api_key and environment:
pinecone.init(api_key=api_key, environment=environment)
self.index = pinecone.Index(index_name)
self._has_pinecone = True
else:
logger.warning("Pinecone credentials not provided, using mock mode")
self.index = None
self._has_pinecone = False
except ImportError:
logger.warning("Pinecone not installed, using mock mode")
self.index = None
self._has_pinecone = False
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {e}")
self.index = None
self._has_pinecone = False
if HAS_OTEL:
self._tracer = trace.get_tracer(__name__)
else:
self._tracer = None
def upsert_batch(
self,
embeddings: List[List[float]],
document_ids: List[str],
metadata: List[Dict[str, Any]]
) -> Dict[str, int]:
if self._tracer and HAS_OTEL:
span = self._tracer.start_span("rag.vector_store.upsert")
else:
span = None
try:
if span:
span.set_attribute("rag.vector_store.type", "pinecone")
span.set_attribute("rag.vector_store.upsert_count", len(embeddings))
span.set_attribute("rag.index.name", self.index_name)
span.set_attribute("rag.index.version", self.lakefs_commit)
enriched_metadata = []
for meta in metadata:
meta_copy = meta.copy()
meta_copy['lakefs_repository'] = self.lakefs_repository
meta_copy['lakefs_commit'] = self.lakefs_commit
meta_copy['lakefs_namespace'] = self.namespace
enriched_metadata.append(meta_copy)
if not self._has_pinecone or not self.index:
logger.info(f"Mock mode: Would upsert {len(embeddings)} vectors to Pinecone")
return {"upserted_count": len(embeddings)}
vectors = [
(doc_id, embedding, meta)
for doc_id, embedding, meta in zip(document_ids, embeddings, enriched_metadata)
]
response = self.index.upsert(
vectors=vectors,
namespace=self.namespace
)
return {"upserted_count": response.upserted_count if hasattr(response, 'upserted_count') else len(vectors)}
finally:
if span:
span.end()
def query(
self,
query_embedding: List[float],
top_k: int = 5,
filter: Optional[Dict[str, Any]] = None,
include_metadata: bool = True
) -> List[Dict[str, Any]]:
if self._tracer and HAS_OTEL:
span = self._tracer.start_span("rag.vector_store.query")
else:
span = None
try:
if span:
span.set_attribute("rag.vector_store.type", "pinecone")
span.set_attribute("rag.vector_store.query_top_k", top_k)
span.set_attribute("rag.index.version", self.lakefs_commit)
if filter is None:
filter = {}
filter['lakefs_commit'] = self.lakefs_commit
if not self._has_pinecone or not self.index:
logger.info(f"Mock mode: Would query Pinecone with top_k={top_k}")
return [
{
"id": f"mock_doc_{i}",
"score": 0.95 - (i * 0.05),
"metadata": {
"lakefs_commit": self.lakefs_commit,
"lakefs_repository": self.lakefs_repository
}
}
for i in range(min(top_k, 3))
]
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter=filter,
include_metadata=include_metadata,
namespace=self.namespace
)
matches = results.get('matches', [])
if span:
span.set_attribute("rag.retrieval.count", len(matches))
return [
{
"id": match.id,
"score": match.score,
"metadata": match.metadata if hasattr(match, 'metadata') else {}
}
for match in matches
]
finally:
if span:
span.end()
def delete(self, ids: List[str]) -> Dict[str, int]:
if not self._has_pinecone or not self.index:
logger.info(f"Mock mode: Would delete {len(ids)} vectors from Pinecone")
return {"deleted_count": len(ids)}
self.index.delete(ids=ids, namespace=self.namespace)
return {"deleted_count": len(ids)}
def describe_index_stats(self) -> Dict[str, Any]:
if not self._has_pinecone or not self.index:
return {
"total_vector_count": 0,
"dimension": 3072,
"index_fullness": 0.0
}
stats = self.index.describe_index_stats()
return {
"total_vector_count": stats.total_vector_count,
"dimension": stats.dimension,
"index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else 0.0
}