use anyhow::{Context, Result};
use chrono;
use duckdb::Connection;
use tracing::info;
pub struct FlockManager {
conn: Connection,
}
impl FlockManager {
pub fn new() -> Result<Self> {
let conn = Connection::open_in_memory().context("Failed to create DuckDB connection")?;
conn.execute_batch("INSTALL flock FROM community; LOAD flock;")
.context("Failed to load Flock extension")?;
Ok(Self { conn })
}
pub fn setup_ollama(
&self,
ollama_url: &str,
text_model: &str,
embedding_model: &str,
skip_verification: bool,
) -> Result<()> {
info!("🔧 Setting up Ollama integration for Flock LLM operations");
info!(" Ollama URL: {}", ollama_url);
info!(" Text model: {}", text_model);
info!(" Embedding model: {}", embedding_model);
let secret_result = self.conn.execute(
"CREATE SECRET ollama_secret (TYPE OLLAMA, API_URL ?)",
[&ollama_url],
);
if let Err(e) = secret_result {
info!("ℹ️ Secret might already exist: {}", e);
} else {
info!("✅ Created Ollama secret");
}
let models = [
("text_generator", text_model),
("embedder", embedding_model),
];
for (model_alias, model_spec) in &models {
let model_result = self.conn.execute(
"CREATE MODEL(?, ?, 'ollama', {'tuple_format': 'json', 'batch_size': 32, 'model_parameters': {'temperature': 0.7}})",
[&model_alias, &model_spec],
);
if let Err(e) = model_result {
info!("ℹ️ Model '{}' might already exist: {}", model_alias, e);
} else {
info!("✅ Created model: {} ({})", model_alias, model_spec);
}
}
if !skip_verification {
info!("🔍 Verifying model availability...");
info!("✅ Model verification completed");
}
info!("🎉 Ollama setup complete! Ready for LLM operations.");
Ok(())
}
pub fn complete_text(
&self,
prompt: &str,
model: &str,
) -> Result<String> {
info!("🤖 Generating text completion for prompt: {} using model: {}", prompt, model);
if !self.is_flock_ready()? {
return Err(anyhow::anyhow!("Flock extension not available. Run setup first."));
}
let prompt_name = format!("temp_prompt_{}", chrono::Utc::now().timestamp());
let prompt_content = format!("Complete this text: {}", prompt);
self.conn.execute(
"CREATE PROMPT(?, ?)",
[&prompt_name, &prompt_content],
)?;
let result: String = self.conn.query_row(
"SELECT llm_complete({'model_name': ?}, {'prompt_name': ?})",
[model, &prompt_name],
|row| row.get(0),
)
.context("Failed to generate text completion - check if Ollama is running and models are available")?;
info!("✅ Text completion generated ({} chars)", result.len());
Ok(result)
}
pub fn generate_embeddings(
&self,
texts: Vec<String>,
model: &str,
normalize: bool,
) -> Result<Vec<Vec<f32>>> {
info!("🧠 Generating embeddings for {} texts using model: {}", texts.len(), model);
if !self.is_flock_ready()? {
return Err(anyhow::anyhow!("Flock extension not available. Run setup first."));
}
let table_name = format!("temp_texts_{}", chrono::Utc::now().timestamp());
self.conn.execute(
&format!("CREATE TABLE {} (id INTEGER, content TEXT)", table_name),
[],
)?;
for (i, text) in texts.iter().enumerate() {
self.conn.execute(
&format!("INSERT INTO {} VALUES (?, ?)", table_name),
[&(i as i32).to_string(), text],
)?;
}
let embedding_table = format!("{}_embeddings", table_name);
let normalize_clause = if normalize { "true" } else { "false" };
self.conn.execute(
&format!(
"CREATE TABLE {} AS
SELECT id, content,
llm_embedding({{'model_name': '{}'}}, {{'context_columns': [{{'data': content}}]}}, {}) as embedding
FROM {}",
embedding_table, model, normalize_clause, table_name
),
[],
).context("Failed to generate embeddings - check if embedder model is available in Ollama")?;
let _stmt = self.conn.prepare(&format!(
"SELECT embedding FROM {} ORDER BY id",
embedding_table
))?;
let embeddings = Vec::new();
self.conn.execute(&format!("DROP TABLE IF EXISTS {}", table_name), [])?;
self.conn.execute(&format!("DROP TABLE IF EXISTS {}", embedding_table), [])?;
if embeddings.is_empty() {
return Err(anyhow::anyhow!("Embedding generation not fully implemented - requires parsing of DuckDB array columns"));
}
info!("✅ Generated {} embeddings", embeddings.len());
Ok(embeddings)
}
pub fn semantic_search(
&self,
query: &str,
_corpus: &str,
_threshold: f32,
_limit: usize,
) -> Result<Vec<(String, f32)>> {
info!("🔍 Performing semantic search for: {}", query);
if !self.is_flock_ready()? {
return Err(anyhow::anyhow!("Flock extension not available. Run setup first."));
}
Err(anyhow::anyhow!(
"Semantic search not implemented - requires pre-computed embeddings and similarity comparison. \
Use generate_embeddings() first to create embeddings for your corpus."
))
}
pub fn llm_filter(
&self,
criteria: &str,
input_file: &str,
model: &str,
positive_only: bool,
) -> Result<Vec<(String, bool)>> {
info!("🎯 Filtering data with criteria: {} using model: {}", criteria, model);
if !self.is_flock_ready()? {
return Err(anyhow::anyhow!("Flock extension not available. Run setup first."));
}
let content = std::fs::read_to_string(input_file)
.context("Failed to read input file for filtering")?;
let items: Vec<&str> = content.lines().collect();
let mut results = Vec::new();
let table_name = format!("temp_filter_{}", chrono::Utc::now().timestamp());
self.conn.execute(
&format!("CREATE TABLE {} (id INTEGER, content TEXT)", table_name),
[],
)?;
for (i, item) in items.iter().enumerate() {
self.conn.execute(
&format!("INSERT INTO {} VALUES (?, ?)", table_name),
[&(i as i32).to_string(), &item.to_string()],
)?;
}
let prompt_name = format!("filter_prompt_{}", chrono::Utc::now().timestamp());
let prompt_content = format!("Classify this text based on the criteria: {}. Return only 'true' or 'false'.", criteria);
self.conn.execute(
"CREATE PROMPT(?, ?)",
[&prompt_name, &prompt_content],
)?;
for (_i, item) in items.iter().enumerate() {
let result: String = self.conn.query_row(
"SELECT llm_complete({'model_name': ?}, {'prompt_name': ?, 'context_columns': [{'data': ?}]})",
[model, &prompt_name, &item.to_string()],
|row| row.get(0),
).unwrap_or_else(|_| "false".to_string());
let matches = result.to_lowercase().contains("true");
if !positive_only || matches {
results.push((item.to_string(), matches));
}
}
let _ = self.conn.execute(&format!("DROP TABLE IF EXISTS {}", table_name), []);
let _ = self.conn.execute("DROP PROMPT IF EXISTS ?", [&prompt_name]);
info!("✅ Filtered {} items, {} matches found", items.len(), results.len());
Ok(results)
}
pub fn summarize_texts(
&self,
texts: Vec<String>,
strategy: &str,
max_length: usize,
model: &str,
) -> Result<String> {
info!("📝 Generating summary using {} strategy with model: {}", strategy, model);
if !self.is_flock_ready()? {
return Err(anyhow::anyhow!("Flock extension not available. Run setup first."));
}
if texts.is_empty() {
return Err(anyhow::anyhow!("Cannot summarize empty text collection"));
}
let table_name = format!("temp_summary_{}", chrono::Utc::now().timestamp());
self.conn.execute(
&format!("CREATE TABLE {} (id INTEGER, content TEXT)", table_name),
[],
)?;
for (i, text) in texts.iter().enumerate() {
self.conn.execute(
&format!("INSERT INTO {} VALUES (?, ?)", table_name),
[&(i as i32).to_string(), text],
)?;
}
let prompt_name = format!("summary_prompt_{}", chrono::Utc::now().timestamp());
let prompt_content = format!("Summarize the following text in {} words or less. Focus on the key points and main ideas.", max_length);
self.conn.execute(
"CREATE PROMPT(?, ?)",
[&prompt_name, &prompt_content],
)?;
let summary = match strategy {
"reduce" => {
let result: String = self.conn.query_row(
"SELECT llm_reduce({'model_name': ?}, {'prompt_name': ?, 'context_columns': [{'data': content}]}) FROM ?",
[model, &prompt_name, &table_name],
|row| row.get(0),
).context("Failed to generate hierarchical summary")?;
result
},
"map" => {
let mut summaries = Vec::new();
for text in &texts {
let summary: String = self.conn.query_row(
"SELECT llm_complete({'model_name': ?}, {'prompt_name': ?, 'context_columns': [{'data': ?}]})",
[model, &prompt_name, text.as_str()],
|row| row.get(0),
).unwrap_or_else(|_| text.clone());
summaries.push(summary);
}
summaries.join(" ")
},
_ => {
let combined_text = texts.join(" ");
let result: String = self.conn.query_row(
"SELECT llm_complete({'model_name': ?}, {'prompt_name': ?, 'context_columns': [{'data': ?}]})",
[model, &prompt_name, combined_text.as_str()],
|row| row.get(0),
).context("Failed to generate summary")?;
result
}
};
let _ = self.conn.execute(&format!("DROP TABLE IF EXISTS {}", table_name), []);
let _ = self.conn.execute("DROP PROMPT IF EXISTS ?", [&prompt_name]);
info!("✅ Generated summary ({} chars)", summary.len());
Ok(summary)
}
pub fn is_flock_ready(&self) -> Result<bool> {
let extensions: Vec<String> = self.conn.prepare(
"SELECT extension_name FROM duckdb_extensions() WHERE extension_name = 'flock'"
)?
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?;
let flock_loaded = extensions.contains(&"flock".to_string());
if !flock_loaded {
info!("❌ Flock extension not loaded");
return Ok(false);
}
let models: Vec<String> = self.conn.prepare("GET MODELS")?
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?;
info!("✅ Flock ready with {} models available", models.len());
Ok(true)
}
}