use anyhow::Result;
use std::sync::Arc;
use arrow_array::{Array, Int64Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lancedb::{
query::{ExecutableQuery, QueryBase, Select},
Connection,
};
use crate::store::table_ops::TableOperations;
pub struct MetadataOperations<'a> {
pub db: &'a Connection,
pub table_ops: TableOperations<'a>,
}
impl<'a> MetadataOperations<'a> {
pub fn new(db: &'a Connection) -> Self {
Self {
db,
table_ops: TableOperations::new(db),
}
}
pub async fn store_git_metadata(&self, commit_hash: &str) -> Result<()> {
if !self.table_ops.table_exists("git_metadata").await? {
self.create_git_metadata_table().await?;
}
if let Ok(Some(existing_hash)) = self.get_last_commit_hash().await {
if existing_hash == commit_hash {
return Ok(());
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
let commit_hashes = vec![commit_hash];
let timestamps = vec![chrono::Utc::now().timestamp()];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(commit_hashes)),
Arc::new(Int64Array::from(timestamps)),
],
)?;
self.table_ops.clear_table("git_metadata").await?;
self.table_ops.store_batch("git_metadata", batch).await?;
Ok(())
}
pub async fn get_last_commit_hash(&self) -> Result<Option<String>> {
if !self.table_ops.table_exists("git_metadata").await? {
return Ok(None);
}
let table = self.db.open_table("git_metadata").execute().await?;
let mut results = table
.query()
.select(Select::Columns(vec!["commit_hash".to_string()]))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("commit_hash") {
if let Some(hash_array) = column.as_any().downcast_ref::<StringArray>() {
if let Some(hash) = hash_array.iter().next() {
return Ok(hash.map(|s| s.to_string()));
}
}
}
}
}
Ok(None)
}
pub async fn store_file_metadata(&self, file_path: &str, mtime: u64) -> Result<()> {
if !self.table_ops.table_exists("file_metadata").await? {
self.create_file_metadata_table().await?;
}
let table = self.db.open_table("file_metadata").execute().await?;
let mut existing_results = table
.query()
.only_if(format!("path = '{}'", file_path))
.limit(1)
.execute()
.await?;
let mut file_exists = false;
while let Some(batch) = existing_results.try_next().await? {
if batch.num_rows() > 0 {
file_exists = true;
break;
}
}
if file_exists {
table
.update()
.only_if(format!("path = '{}'", file_path))
.column("mtime", (mtime as i64).to_string())
.column("indexed_at", chrono::Utc::now().timestamp().to_string())
.execute()
.await?;
} else {
let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new("mtime", DataType::Int64, false),
Field::new("indexed_at", DataType::Int64, false),
]));
let paths = vec![file_path];
let mtimes = vec![mtime as i64];
let timestamps = vec![chrono::Utc::now().timestamp()];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(paths)),
Arc::new(Int64Array::from(mtimes)),
Arc::new(Int64Array::from(timestamps)),
],
)?;
use std::iter::once;
let batches = once(Ok(batch.clone()));
let batch_reader =
arrow::record_batch::RecordBatchIterator::new(batches, batch.schema());
table.add(batch_reader).execute().await?;
}
Ok(())
}
pub async fn get_file_mtime(&self, file_path: &str) -> Result<Option<u64>> {
if !self.table_ops.table_exists("file_metadata").await? {
return Ok(None);
}
let table = self.db.open_table("file_metadata").execute().await?;
let mut results = table
.query()
.only_if(format!("path = '{}'", file_path))
.select(Select::Columns(vec!["mtime".to_string()]))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("mtime") {
if let Some(mtime_array) = column.as_any().downcast_ref::<Int64Array>() {
if let Some(mtime) = mtime_array.iter().next() {
return Ok(mtime.map(|t| t as u64));
}
}
}
}
}
Ok(None)
}
pub async fn get_all_file_metadata(&self) -> Result<std::collections::HashMap<String, u64>> {
let mut metadata_map = std::collections::HashMap::new();
if !self.table_ops.table_exists("file_metadata").await? {
return Ok(metadata_map);
}
let table = self.db.open_table("file_metadata").execute().await?;
let mut results = table
.query()
.select(Select::Columns(vec![
"path".to_string(),
"mtime".to_string(),
]))
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let (Some(path_column), Some(mtime_column)) =
(batch.column_by_name("path"), batch.column_by_name("mtime"))
{
if let (Some(path_array), Some(mtime_array)) = (
path_column.as_any().downcast_ref::<StringArray>(),
mtime_column.as_any().downcast_ref::<Int64Array>(),
) {
for i in 0..path_array.len() {
if let (Some(path), Some(mtime)) = (
path_array.iter().nth(i).flatten(),
mtime_array.iter().nth(i).flatten(),
) {
metadata_map.insert(path.to_string(), mtime as u64);
}
}
}
}
}
}
Ok(metadata_map)
}
pub async fn clear_git_metadata(&self) -> Result<()> {
self.table_ops.clear_table("git_metadata").await
}
async fn create_git_metadata_table(&self) -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
self.table_ops
.create_table_with_schema("git_metadata", schema)
.await
}
async fn create_file_metadata_table(&self) -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new("mtime", DataType::Int64, false),
Field::new("indexed_at", DataType::Int64, false),
]));
self.table_ops
.create_table_with_schema("file_metadata", schema)
.await
}
pub async fn get_graphrag_last_commit_hash(&self) -> Result<Option<String>> {
if !self.table_ops.table_exists("graphrag_git_metadata").await? {
return Ok(None);
}
let table = self
.db
.open_table("graphrag_git_metadata")
.execute()
.await?;
let mut results = table
.query()
.select(Select::Columns(vec!["commit_hash".to_string()]))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("commit_hash") {
if let Some(hash_array) = column.as_any().downcast_ref::<StringArray>() {
if let Some(hash) = hash_array.iter().next() {
return Ok(hash.map(|s| s.to_string()));
}
}
}
}
}
Ok(None)
}
pub async fn store_graphrag_commit_hash(&self, commit_hash: &str) -> Result<()> {
if !self.table_ops.table_exists("graphrag_git_metadata").await? {
self.create_graphrag_git_metadata_table().await?;
}
if let Ok(Some(existing_hash)) = self.get_graphrag_last_commit_hash().await {
if existing_hash == commit_hash {
return Ok(());
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
let commit_hashes = vec![commit_hash];
let timestamps = vec![chrono::Utc::now().timestamp()];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(commit_hashes)),
Arc::new(Int64Array::from(timestamps)),
],
)?;
self.table_ops.clear_table("graphrag_git_metadata").await?;
self.table_ops
.store_batch("graphrag_git_metadata", batch)
.await?;
Ok(())
}
async fn create_graphrag_git_metadata_table(&self) -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
self.table_ops
.create_table_with_schema("graphrag_git_metadata", schema)
.await
}
pub async fn get_commits_last_commit_hash(&self) -> Result<Option<String>> {
if !self.table_ops.table_exists("commits_git_metadata").await? {
return Ok(None);
}
let table = self.db.open_table("commits_git_metadata").execute().await?;
let mut results = table
.query()
.select(Select::Columns(vec!["commit_hash".to_string()]))
.limit(1)
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("commit_hash") {
if let Some(hash_array) = column.as_any().downcast_ref::<StringArray>() {
if let Some(hash) = hash_array.iter().next() {
return Ok(hash.map(|s| s.to_string()));
}
}
}
}
}
Ok(None)
}
pub async fn store_commits_last_commit_hash(&self, commit_hash: &str) -> Result<()> {
if !self.table_ops.table_exists("commits_git_metadata").await? {
self.create_commits_git_metadata_table().await?;
}
if let Ok(Some(existing_hash)) = self.get_commits_last_commit_hash().await {
if existing_hash == commit_hash {
return Ok(());
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
let commit_hashes = vec![commit_hash];
let timestamps = vec![chrono::Utc::now().timestamp()];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(commit_hashes)),
Arc::new(Int64Array::from(timestamps)),
],
)?;
self.table_ops.clear_table("commits_git_metadata").await?;
self.table_ops
.store_batch("commits_git_metadata", batch)
.await?;
Ok(())
}
async fn create_commits_git_metadata_table(&self) -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("commit_hash", DataType::Utf8, false),
Field::new("indexed_at", DataType::Int64, false),
]));
self.table_ops
.create_table_with_schema("commits_git_metadata", schema)
.await
}
}