use anyhow::{Result, anyhow};
use std::path::{Path, PathBuf};
use tokio::sync::mpsc;
use walkdir::WalkDir;
#[derive(Debug, Clone)]
pub struct IndexProgress {
pub current_file: String,
pub processed: usize,
pub total: usize,
pub skipped: usize,
pub complete: bool,
pub error: Option<String>,
}
impl IndexProgress {
pub fn new(total: usize) -> Self {
Self {
current_file: String::new(),
processed: 0,
total,
skipped: 0,
complete: false,
error: None,
}
}
pub fn percent(&self) -> u8 {
if self.total == 0 {
return 0;
}
((self.processed as f64 / self.total as f64) * 100.0).min(100.0) as u8
}
pub fn progress_bar(&self, width: usize) -> String {
let filled = (self.percent() as usize * width) / 100;
let empty = width.saturating_sub(filled);
format!(
"[{}{}] {}% ({}/{})",
"=".repeat(filled),
" ".repeat(empty),
self.percent(),
self.processed,
self.total
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DataSetupOption {
ImportLanceDB,
IndexDirectory,
Skip,
}
impl DataSetupOption {
pub fn label(&self) -> &'static str {
match self {
DataSetupOption::ImportLanceDB => "[1] Import existing LanceDB",
DataSetupOption::IndexDirectory => "[2] Index a directory now",
DataSetupOption::Skip => "[3] Skip for now",
}
}
pub fn detail(&self) -> &'static str {
match self {
DataSetupOption::ImportLanceDB => "Copy or link an existing LanceDB database",
DataSetupOption::IndexDirectory => "Recursively index files with embeddings",
DataSetupOption::Skip => "Configure data later via CLI",
}
}
pub fn all() -> Vec<DataSetupOption> {
vec![
DataSetupOption::ImportLanceDB,
DataSetupOption::IndexDirectory,
DataSetupOption::Skip,
]
}
}
#[derive(Debug, Clone)]
pub struct DataSetupState {
pub option: DataSetupOption,
pub focus: usize,
pub input_mode: bool,
pub input_buffer: String,
pub source_path: Option<String>,
pub namespace: Option<String>,
pub progress: Option<IndexProgress>,
pub sub_step: DataSetupSubStep,
pub import_mode: ImportMode,
pub validation_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DataSetupSubStep {
SelectOption,
EnterPath,
EnterNamespace,
SelectImportMode,
Indexing,
Complete,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ImportMode {
Copy,
Symlink,
ConfigOnly,
}
impl ImportMode {
pub fn label(&self) -> &'static str {
match self {
ImportMode::Copy => "[1] Copy database files",
ImportMode::Symlink => "[2] Create symlink",
ImportMode::ConfigOnly => "[3] Just update config path",
}
}
pub fn all() -> Vec<ImportMode> {
vec![
ImportMode::Copy,
ImportMode::Symlink,
ImportMode::ConfigOnly,
]
}
}
impl Default for DataSetupState {
fn default() -> Self {
Self {
option: DataSetupOption::Skip,
focus: 0,
input_mode: false,
input_buffer: String::new(),
source_path: None,
namespace: None,
progress: None,
sub_step: DataSetupSubStep::SelectOption,
import_mode: ImportMode::ConfigOnly,
validation_error: None,
}
}
}
impl DataSetupState {
pub fn new() -> Self {
Self::default()
}
pub fn focused_option(&self) -> DataSetupOption {
let options = DataSetupOption::all();
options
.get(self.focus)
.cloned()
.unwrap_or(DataSetupOption::Skip)
}
pub fn select_focused(&mut self) {
self.option = self.focused_option();
self.sub_step = match self.option {
DataSetupOption::ImportLanceDB => DataSetupSubStep::EnterPath,
DataSetupOption::IndexDirectory => DataSetupSubStep::EnterPath,
DataSetupOption::Skip => DataSetupSubStep::Complete,
};
if self.sub_step == DataSetupSubStep::EnterPath {
self.input_mode = true;
self.input_buffer.clear();
}
}
pub fn confirm_path(&mut self) {
let path = self.input_buffer.trim().to_string();
if path.is_empty() {
return;
}
self.source_path = Some(path);
self.input_mode = false;
match self.option {
DataSetupOption::ImportLanceDB => {
self.sub_step = DataSetupSubStep::SelectImportMode;
self.focus = 0;
}
DataSetupOption::IndexDirectory => {
if let Some(ref path) = self.source_path {
let folder_name = PathBuf::from(path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("indexed")
.to_string();
self.input_buffer = format!("kb:{}", folder_name);
}
self.sub_step = DataSetupSubStep::EnterNamespace;
self.input_mode = true;
}
DataSetupOption::Skip => {
self.sub_step = DataSetupSubStep::Complete;
}
}
}
pub fn confirm_namespace(&mut self) {
let ns = self.input_buffer.trim().to_string();
if ns.is_empty() {
return;
}
self.namespace = Some(ns);
self.input_mode = false;
self.sub_step = DataSetupSubStep::Indexing;
}
pub fn select_import_mode(&mut self, mode: ImportMode) {
self.import_mode = mode;
self.sub_step = DataSetupSubStep::Complete;
}
pub fn is_done(&self) -> bool {
self.sub_step == DataSetupSubStep::Complete
}
pub fn is_indexing(&self) -> bool {
self.sub_step == DataSetupSubStep::Indexing
}
}
const SUPPORTED_EXTENSIONS: &[&str] = &[
"txt", "md", "markdown", "rst", "org", "json", "yaml", "yml", "toml", "xml", "rs", "py", "js",
"ts", "tsx", "jsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "php", "swift", "kt", "scala",
"sh", "bash", "zsh", "fish", "sql", "graphql", "html", "css", "scss", "sass", "less", "pdf",
];
pub fn collect_indexable_files(dir_path: &Path) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
if !dir_path.exists() {
return Err(anyhow!("Directory does not exist: {}", dir_path.display()));
}
if !dir_path.is_dir() {
return Err(anyhow!("Path is not a directory: {}", dir_path.display()));
}
for entry in WalkDir::new(dir_path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
if path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with('.'))
.unwrap_or(false)
{
continue;
}
if path.is_dir() {
continue;
}
if let Some(ext) = path.extension().and_then(|e| e.to_str())
&& SUPPORTED_EXTENSIONS.contains(&ext.to_lowercase().as_str())
{
files.push(path.to_path_buf());
}
}
files.sort();
Ok(files)
}
pub async fn import_lancedb(
source_path: &Path,
target_path: &Path,
mode: ImportMode,
) -> Result<String> {
let source = source_path.to_path_buf();
if !source.exists() {
return Err(anyhow!("Source path does not exist: {}", source.display()));
}
match mode {
ImportMode::Copy => {
if let Some(parent) = target_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
copy_dir_recursive(&source, target_path).await?;
Ok(format!("Copied database to {}", target_path.display()))
}
ImportMode::Symlink => {
if let Some(parent) = target_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
if target_path.exists() || target_path.is_symlink() {
if target_path.is_dir() {
tokio::fs::remove_dir_all(target_path).await?;
} else {
tokio::fs::remove_file(target_path).await?;
}
}
#[cfg(unix)]
tokio::fs::symlink(&source, target_path).await?;
#[cfg(windows)]
tokio::fs::symlink_dir(&source, target_path).await?;
Ok(format!(
"Created symlink {} -> {}",
target_path.display(),
source.display()
))
}
ImportMode::ConfigOnly => Ok(format!(
"Config will use existing database at {}",
source.display()
)),
}
}
async fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
use crate::path_utils::validate_write_path;
let (safe_src, mut entries) = crate::path_utils::safe_read_dir(src).await?;
let safe_dst = validate_write_path(dst)?;
tokio::fs::create_dir_all(&safe_dst).await?;
let _ = &safe_src; while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let entry_name = entry.file_name();
let entry_name_str = entry_name.to_string_lossy();
if entry_name_str.contains("..")
|| entry_name_str.starts_with('.')
&& entry_name_str.len() > 1
&& entry_name_str.chars().nth(1) == Some('.')
{
continue; }
let dest_path = safe_dst.join(&entry_name);
if path.is_dir() {
Box::pin(copy_dir_recursive(&path, &dest_path)).await?;
} else {
if let Ok(_dst) = crate::path_utils::safe_copy(&path, &dest_path) {
}
}
}
Ok(())
}
pub fn start_indexing(
dir_path: PathBuf,
namespace: String,
embedding_config: crate::EmbeddingConfig,
db_path: String,
) -> mpsc::Receiver<IndexProgress> {
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
let result = run_indexing(dir_path, namespace, embedding_config, db_path, tx.clone()).await;
if let Err(e) = result {
let mut progress = IndexProgress::new(0);
progress.error = Some(e.to_string());
progress.complete = true;
let _ = tx.send(progress).await;
}
});
rx
}
async fn run_indexing(
dir_path: PathBuf,
namespace: String,
embedding_config: crate::EmbeddingConfig,
db_path: String,
tx: mpsc::Sender<IndexProgress>,
) -> Result<()> {
use crate::{EmbeddingClient, RAGPipeline, StorageManager};
use std::sync::Arc;
use tokio::sync::Mutex;
let files = collect_indexable_files(&dir_path)?;
let total = files.len();
if total == 0 {
let mut progress = IndexProgress::new(0);
progress.complete = true;
progress.error = Some("No indexable files found in directory".to_string());
let _ = tx.send(progress).await;
return Ok(());
}
let mut progress = IndexProgress::new(total);
let _ = tx.send(progress.clone()).await;
let expanded_db_path = shellexpand::tilde(&db_path).to_string();
let storage = Arc::new(StorageManager::new_lance_only(&expanded_db_path).await?);
storage.ensure_collection().await?;
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(&embedding_config).await?));
let pipeline = RAGPipeline::new(embedding_client, storage).await?;
for (i, file_path) in files.iter().enumerate() {
progress.current_file = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
progress.processed = i;
let _ = tx.send(progress.clone()).await;
match pipeline
.index_document_with_dedup(file_path, Some(&namespace), crate::SliceMode::Onion)
.await
{
Ok(result) => {
if result.is_skipped() {
progress.skipped += 1;
}
}
Err(e) => {
tracing::warn!("Failed to index {}: {}", file_path.display(), e);
progress.skipped += 1;
}
}
}
progress.processed = total;
progress.complete = true;
progress.current_file = "Complete".to_string();
let _ = tx.send(progress).await;
Ok(())
}
pub fn validate_path(path: &str) -> Result<PathBuf> {
use crate::path_utils::sanitize_existing_path;
if path.trim().is_empty() {
return Err(anyhow!("Path cannot be empty"));
}
sanitize_existing_path(path)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_progress_bar() {
let mut progress = IndexProgress::new(100);
progress.processed = 50;
let bar = progress.progress_bar(20);
assert!(bar.contains("50%"));
assert!(bar.contains("50/100"));
}
#[test]
fn test_data_setup_options() {
let options = DataSetupOption::all();
assert_eq!(options.len(), 3);
}
#[test]
fn test_state_transitions() {
let mut state = DataSetupState::new();
assert_eq!(state.sub_step, DataSetupSubStep::SelectOption);
state.focus = 1; state.select_focused();
assert_eq!(state.option, DataSetupOption::IndexDirectory);
assert_eq!(state.sub_step, DataSetupSubStep::EnterPath);
assert!(state.input_mode);
}
#[test]
fn test_validate_path() {
assert!(validate_path("").is_err());
assert!(validate_path(" ").is_err());
assert!(validate_path("/this/path/does/not/exist/ever").is_err());
let result = validate_path("~");
assert!(result.is_ok() || result.is_err());
}
}