use crate::rocksdb_storage::{SessionCheckpoint, StoredWorkspace};
use anyhow::Result;
use async_trait::async_trait;
use post_cortex_core::core::context_update::{
ContextUpdate, EntityData, EntityRelationship, RelationType,
};
use post_cortex_core::graph::entity_graph::EntityNetwork;
use post_cortex_core::session::active_session::ActiveSession;
use post_cortex_core::workspace::SessionRole;
use post_cortex_embeddings::{SearchMatch, VectorMetadata};
use post_cortex_proto::pb::{CascadeInvalidateReport, FreshnessEntry, SourceReference, SymbolId};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct FreshnessReportExt {
pub entries: Vec<FreshnessEntry>,
}
#[derive(Debug, Clone)]
pub struct StaleEntryInfo {
pub entry_id: String,
pub symbol_name: Option<String>,
pub symbol_type: Option<String>,
}
#[async_trait]
pub trait Storage: FreshnessStorage + Send + Sync {
async fn save_session(&self, session: &ActiveSession) -> Result<()>;
async fn load_session(&self, session_id: Uuid) -> Result<ActiveSession>;
async fn delete_session(&self, session_id: Uuid) -> Result<()>;
async fn clear_session_entities(&self, session_id: Uuid) -> Result<()>;
async fn list_sessions(&self) -> Result<Vec<Uuid>>;
async fn session_exists(&self, session_id: Uuid) -> Result<bool>;
async fn batch_save_updates(&self, session_id: Uuid, updates: Vec<ContextUpdate>)
-> Result<()>;
async fn save_session_with_updates(
&self,
session: &ActiveSession,
session_id: Uuid,
updates: Vec<ContextUpdate>,
) -> Result<()> {
self.save_session(session).await?;
if !updates.is_empty() {
self.batch_save_updates(session_id, updates).await?;
}
Ok(())
}
async fn load_session_updates(&self, session_id: Uuid) -> Result<Vec<ContextUpdate>>;
async fn save_checkpoint(&self, checkpoint: &SessionCheckpoint) -> Result<()>;
async fn load_checkpoint(&self, checkpoint_id: Uuid) -> Result<SessionCheckpoint>;
async fn list_checkpoints(&self) -> Result<Vec<SessionCheckpoint>>;
async fn save_workspace_metadata(
&self,
workspace_id: Uuid,
name: &str,
description: &str,
session_ids: &[Uuid],
) -> Result<()>;
async fn delete_workspace(&self, workspace_id: Uuid) -> Result<()>;
async fn list_workspaces(&self) -> Result<Vec<StoredWorkspace>>;
async fn add_session_to_workspace(
&self,
workspace_id: Uuid,
session_id: Uuid,
role: SessionRole,
) -> Result<()>;
async fn remove_session_from_workspace(
&self,
workspace_id: Uuid,
session_id: Uuid,
) -> Result<()>;
async fn compact(&self) -> Result<()>;
async fn get_key_count(&self) -> Result<usize>;
async fn get_stats(&self) -> Result<String>;
}
#[async_trait]
pub trait GraphStorage: Storage {
async fn upsert_entity(&self, session_id: Uuid, entity: &EntityData) -> Result<()>;
async fn get_entity(&self, session_id: Uuid, name: &str) -> Result<Option<EntityData>>;
async fn list_entities(&self, session_id: Uuid) -> Result<Vec<EntityData>>;
async fn delete_entity(&self, session_id: Uuid, name: &str) -> Result<()>;
async fn create_relationship(
&self,
session_id: Uuid,
relationship: &EntityRelationship,
) -> Result<()>;
async fn find_related_entities(
&self,
session_id: Uuid,
entity_name: &str,
) -> Result<Vec<String>>;
async fn find_related_by_type(
&self,
session_id: Uuid,
entity_name: &str,
relation_type: &RelationType,
) -> Result<Vec<String>>;
async fn find_shortest_path(
&self,
session_id: Uuid,
from: &str,
to: &str,
) -> Result<Option<Vec<String>>>;
async fn get_entity_network(
&self,
session_id: Uuid,
center: &str,
max_depth: usize,
) -> Result<EntityNetwork>;
}
#[async_trait]
pub trait VectorStorage: Send + Sync {
async fn add_vector(&self, vector: Vec<f32>, metadata: VectorMetadata) -> Result<String>;
async fn add_vectors_batch(
&self,
vectors: Vec<(Vec<f32>, VectorMetadata)>,
) -> Result<Vec<String>>;
async fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchMatch>>;
async fn search_in_session(
&self,
query: &[f32],
k: usize,
session_id: &str,
) -> Result<Vec<SearchMatch>>;
async fn search_by_content_type(
&self,
query: &[f32],
k: usize,
content_type: &str,
) -> Result<Vec<SearchMatch>>;
async fn remove_vector(&self, id: &str) -> Result<bool>;
async fn has_session_embeddings(&self, session_id: &str) -> bool;
async fn count_session_embeddings(&self, session_id: &str) -> usize;
async fn total_count(&self) -> usize;
async fn get_session_vectors(
&self,
session_id: &str,
) -> Result<Vec<(Vec<f32>, VectorMetadata)>>;
async fn get_all_vectors(&self) -> Result<Vec<(Vec<f32>, VectorMetadata)>>;
}
#[async_trait]
pub trait FreshnessStorage: Send + Sync {
async fn register_source(&self, session_id: Uuid, reference: SourceReference) -> Result<()>;
async fn check_freshness(&self, entry_id: &str, file_hash: &[u8]) -> Result<FreshnessEntry>;
async fn check_freshness_semantic(
&self,
entry_id: &str,
file_hash: &[u8],
_ast_hash: Option<&[u8]>,
_symbol_name: Option<&str>,
) -> Result<FreshnessEntry> {
self.check_freshness(entry_id, file_hash).await
}
async fn invalidate_source(&self, file_path: &str) -> Result<u32>;
async fn get_entries_by_source(&self, file_path: &str) -> Result<Vec<SourceReference>>;
async fn get_stale_entries_by_source(&self, file_path: &str) -> Result<Vec<StaleEntryInfo>>;
async fn register_symbol_dependencies(
&self,
from: SymbolId,
to_symbols: Vec<SymbolId>,
) -> Result<u32>;
async fn cascade_invalidate(
&self,
changed: SymbolId,
new_ast_hash: Vec<u8>,
max_depth: u32,
) -> Result<CascadeInvalidateReport>;
async fn check_freshness_batch(
&self,
entries: Vec<(String, Vec<u8>, Option<Vec<u8>>, Option<String>)>,
) -> Result<Vec<FreshnessEntry>> {
let mut results = Vec::with_capacity(entries.len());
for (entry_id, file_hash, ast_hash, symbol_name) in entries {
let result = self
.check_freshness_semantic(
&entry_id,
&file_hash,
ast_hash.as_deref(),
symbol_name.as_deref(),
)
.await
.unwrap_or_else(|_| FreshnessEntry {
entry_id: entry_id.clone(),
file_path: String::new(),
status: post_cortex_proto::pb::FreshnessStatus::Unknown as i32,
stored_hash: Vec::new(),
current_hash: file_hash,
});
results.push(result);
}
Ok(results)
}
}
#[derive(Clone)]
pub enum StorageBackend {
RocksDB(crate::RealRocksDBStorage),
#[cfg(feature = "surrealdb-storage")]
SurrealDB(std::sync::Arc<crate::surrealdb_storage::SurrealDBStorage>),
}
impl StorageBackend {
pub fn supports_native_graph(&self) -> bool {
match self {
StorageBackend::RocksDB(_) => false,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(_) => true,
}
}
pub fn supports_native_vectors(&self) -> bool {
match self {
StorageBackend::RocksDB(_) => false,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(_) => true,
}
}
}
#[async_trait]
impl Storage for StorageBackend {
async fn save_session(&self, session: &ActiveSession) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.save_session(session).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.save_session(session).await,
}
}
async fn load_session(&self, session_id: Uuid) -> Result<ActiveSession> {
match self {
StorageBackend::RocksDB(storage) => storage.load_session(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.load_session(session_id).await,
}
}
async fn delete_session(&self, session_id: Uuid) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.delete_session(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.delete_session(session_id).await,
}
}
async fn clear_session_entities(&self, session_id: Uuid) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.clear_session_entities(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.clear_session_entities(session_id).await,
}
}
async fn list_sessions(&self) -> Result<Vec<Uuid>> {
match self {
StorageBackend::RocksDB(storage) => storage.list_sessions().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.list_sessions().await,
}
}
async fn session_exists(&self, session_id: Uuid) -> Result<bool> {
match self {
StorageBackend::RocksDB(storage) => storage.session_exists(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.session_exists(session_id).await,
}
}
async fn batch_save_updates(
&self,
session_id: Uuid,
updates: Vec<ContextUpdate>,
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage.batch_save_updates(session_id, updates).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.batch_save_updates(session_id, updates).await
}
}
}
async fn save_session_with_updates(
&self,
session: &ActiveSession,
session_id: Uuid,
updates: Vec<ContextUpdate>,
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.save_session_with_updates(session, session_id, updates)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(_) => {
self.save_session(session).await?;
if !updates.is_empty() {
self.batch_save_updates(session_id, updates).await?;
}
Ok(())
}
}
}
async fn load_session_updates(&self, session_id: Uuid) -> Result<Vec<ContextUpdate>> {
match self {
StorageBackend::RocksDB(storage) => storage.load_session_updates(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.load_session_updates(session_id).await,
}
}
async fn save_checkpoint(&self, checkpoint: &SessionCheckpoint) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.save_checkpoint(checkpoint).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.save_checkpoint(checkpoint).await,
}
}
async fn load_checkpoint(&self, checkpoint_id: Uuid) -> Result<SessionCheckpoint> {
match self {
StorageBackend::RocksDB(storage) => storage.load_checkpoint(checkpoint_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.load_checkpoint(checkpoint_id).await,
}
}
async fn list_checkpoints(&self) -> Result<Vec<SessionCheckpoint>> {
match self {
StorageBackend::RocksDB(storage) => storage.list_checkpoints().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.list_checkpoints().await,
}
}
async fn save_workspace_metadata(
&self,
workspace_id: Uuid,
name: &str,
description: &str,
session_ids: &[Uuid],
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.save_workspace_metadata(workspace_id, name, description, session_ids)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.save_workspace_metadata(workspace_id, name, description, session_ids)
.await
}
}
}
async fn delete_workspace(&self, workspace_id: Uuid) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.delete_workspace(workspace_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.delete_workspace(workspace_id).await,
}
}
async fn list_workspaces(&self) -> Result<Vec<StoredWorkspace>> {
match self {
StorageBackend::RocksDB(storage) => storage.list_workspaces().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.list_workspaces().await,
}
}
async fn add_session_to_workspace(
&self,
workspace_id: Uuid,
session_id: Uuid,
role: SessionRole,
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.add_session_to_workspace(workspace_id, session_id, role)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.add_session_to_workspace(workspace_id, session_id, role)
.await
}
}
}
async fn remove_session_from_workspace(
&self,
workspace_id: Uuid,
session_id: Uuid,
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.remove_session_from_workspace(workspace_id, session_id)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.remove_session_from_workspace(workspace_id, session_id)
.await
}
}
}
async fn compact(&self) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.compact().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.compact().await,
}
}
async fn get_key_count(&self) -> Result<usize> {
match self {
StorageBackend::RocksDB(storage) => storage.get_key_count().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.get_key_count().await,
}
}
async fn get_stats(&self) -> Result<String> {
match self {
StorageBackend::RocksDB(storage) => storage.get_stats().await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.get_stats().await,
}
}
}
#[async_trait]
impl GraphStorage for StorageBackend {
async fn upsert_entity(&self, session_id: Uuid, entity: &EntityData) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.upsert_entity(session_id, entity).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.upsert_entity(session_id, entity).await,
}
}
async fn get_entity(&self, session_id: Uuid, name: &str) -> Result<Option<EntityData>> {
match self {
StorageBackend::RocksDB(storage) => storage.get_entity(session_id, name).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.get_entity(session_id, name).await,
}
}
async fn list_entities(&self, session_id: Uuid) -> Result<Vec<EntityData>> {
match self {
StorageBackend::RocksDB(storage) => storage.list_entities(session_id).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.list_entities(session_id).await,
}
}
async fn delete_entity(&self, session_id: Uuid, name: &str) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => storage.delete_entity(session_id, name).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.delete_entity(session_id, name).await,
}
}
async fn create_relationship(
&self,
session_id: Uuid,
relationship: &EntityRelationship,
) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage.create_relationship(session_id, relationship).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.create_relationship(session_id, relationship).await
}
}
}
async fn find_related_entities(
&self,
session_id: Uuid,
entity_name: &str,
) -> Result<Vec<String>> {
match self {
StorageBackend::RocksDB(storage) => {
storage.find_related_entities(session_id, entity_name).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.find_related_entities(session_id, entity_name).await
}
}
}
async fn find_related_by_type(
&self,
session_id: Uuid,
entity_name: &str,
relation_type: &RelationType,
) -> Result<Vec<String>> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.find_related_by_type(session_id, entity_name, relation_type)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.find_related_by_type(session_id, entity_name, relation_type)
.await
}
}
}
async fn find_shortest_path(
&self,
session_id: Uuid,
from: &str,
to: &str,
) -> Result<Option<Vec<String>>> {
match self {
StorageBackend::RocksDB(storage) => {
storage.find_shortest_path(session_id, from, to).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.find_shortest_path(session_id, from, to).await
}
}
}
async fn get_entity_network(
&self,
session_id: Uuid,
center: &str,
max_depth: usize,
) -> Result<EntityNetwork> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.get_entity_network(session_id, center, max_depth)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.get_entity_network(session_id, center, max_depth)
.await
}
}
}
}
#[async_trait]
impl FreshnessStorage for StorageBackend {
async fn register_source(&self, session_id: Uuid, reference: SourceReference) -> Result<()> {
match self {
StorageBackend::RocksDB(storage) => {
storage.register_source(session_id, reference).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.register_source(session_id, reference).await
}
}
}
async fn check_freshness(&self, entry_id: &str, file_hash: &[u8]) -> Result<FreshnessEntry> {
match self {
StorageBackend::RocksDB(storage) => storage.check_freshness(entry_id, file_hash).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.check_freshness(entry_id, file_hash).await
}
}
}
async fn invalidate_source(&self, file_path: &str) -> Result<u32> {
match self {
StorageBackend::RocksDB(storage) => storage.invalidate_source(file_path).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.invalidate_source(file_path).await,
}
}
async fn get_entries_by_source(&self, file_path: &str) -> Result<Vec<SourceReference>> {
match self {
StorageBackend::RocksDB(storage) => storage.get_entries_by_source(file_path).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.get_entries_by_source(file_path).await,
}
}
async fn get_stale_entries_by_source(&self, file_path: &str) -> Result<Vec<StaleEntryInfo>> {
match self {
StorageBackend::RocksDB(storage) => {
storage.get_stale_entries_by_source(file_path).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.get_stale_entries_by_source(file_path).await
}
}
}
async fn check_freshness_semantic(
&self,
entry_id: &str,
file_hash: &[u8],
ast_hash: Option<&[u8]>,
symbol_name: Option<&str>,
) -> Result<FreshnessEntry> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.check_freshness_semantic(entry_id, file_hash, ast_hash, symbol_name)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.check_freshness_semantic(entry_id, file_hash, ast_hash, symbol_name)
.await
}
}
}
async fn register_symbol_dependencies(
&self,
from: SymbolId,
to_symbols: Vec<SymbolId>,
) -> Result<u32> {
match self {
StorageBackend::RocksDB(storage) => {
storage.register_symbol_dependencies(from, to_symbols).await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage.register_symbol_dependencies(from, to_symbols).await
}
}
}
async fn cascade_invalidate(
&self,
changed: SymbolId,
new_ast_hash: Vec<u8>,
max_depth: u32,
) -> Result<CascadeInvalidateReport> {
match self {
StorageBackend::RocksDB(storage) => {
storage
.cascade_invalidate(changed, new_ast_hash, max_depth)
.await
}
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => {
storage
.cascade_invalidate(changed, new_ast_hash, max_depth)
.await
}
}
}
async fn check_freshness_batch(
&self,
entries: Vec<(String, Vec<u8>, Option<Vec<u8>>, Option<String>)>,
) -> Result<Vec<FreshnessEntry>> {
match self {
StorageBackend::RocksDB(storage) => storage.check_freshness_batch(entries).await,
#[cfg(feature = "surrealdb-storage")]
StorageBackend::SurrealDB(storage) => storage.check_freshness_batch(entries).await,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StorageConfig {
pub backend: StorageBackendType,
pub path: std::path::PathBuf,
#[cfg(feature = "surrealdb-storage")]
pub surrealdb: Option<SurrealDBConfig>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum StorageBackendType {
#[default]
RocksDB,
#[cfg(feature = "surrealdb-storage")]
SurrealDB,
}
impl StorageBackendType {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"rocksdb" | "rocks" => Some(StorageBackendType::RocksDB),
#[cfg(feature = "surrealdb-storage")]
"surrealdb" | "surreal" => Some(StorageBackendType::SurrealDB),
_ => None,
}
}
}
#[cfg(feature = "surrealdb-storage")]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SurrealDBConfig {
pub namespace: String,
pub database: String,
pub tikv_endpoints: Option<Vec<String>>,
}
#[cfg(feature = "surrealdb-storage")]
impl Default for SurrealDBConfig {
fn default() -> Self {
Self {
namespace: "post_cortex".to_string(),
database: "main".to_string(),
tikv_endpoints: None,
}
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
backend: StorageBackendType::default(),
path: dirs::data_local_dir()
.unwrap_or_else(|| std::path::PathBuf::from("."))
.join("post-cortex")
.join("data"),
#[cfg(feature = "surrealdb-storage")]
surrealdb: None,
}
}
}