use crate::indexer::IndexMiddleware;
use crate::Result;
use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;
use terraphim_config::Haystack;
use terraphim_persistence::Persistable;
use terraphim_types::Index;
#[derive(Debug, Deserialize)]
struct QuickwitSearchResponse {
num_hits: u64,
hits: Vec<serde_json::Value>,
elapsed_time_micros: u64,
#[serde(default)]
#[allow(dead_code)]
errors: Vec<String>,
}
#[derive(Debug, Deserialize, Clone)]
struct QuickwitIndexInfo {
index_id: String,
}
#[derive(Debug, Clone)]
struct QuickwitConfig {
auth_token: Option<String>,
auth_username: Option<String>,
auth_password: Option<String>,
default_index: Option<String>,
index_filter: Option<String>,
max_hits: u64,
#[allow(dead_code)]
timeout_seconds: u64,
sort_by: String,
}
#[derive(Debug, Clone)]
pub struct QuickwitHaystackIndexer {
client: Client,
}
impl Default for QuickwitHaystackIndexer {
fn default() -> Self {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(10))
.user_agent("Terraphim/1.0 (Quickwit integration)")
.build()
.unwrap_or_else(|_| Client::new());
Self { client }
}
}
impl QuickwitHaystackIndexer {
fn parse_config(&self, haystack: &Haystack) -> QuickwitConfig {
let params = &haystack.extra_parameters;
QuickwitConfig {
auth_token: params.get("auth_token").cloned(),
auth_username: params.get("auth_username").cloned(),
auth_password: params.get("auth_password").cloned(),
default_index: params.get("default_index").cloned(),
index_filter: params.get("index_filter").cloned(),
max_hits: params
.get("max_hits")
.and_then(|v| v.parse().ok())
.unwrap_or(100),
timeout_seconds: params
.get("timeout_seconds")
.and_then(|v| v.parse().ok())
.unwrap_or(10),
sort_by: params
.get("sort_by")
.cloned()
.unwrap_or_else(|| "-timestamp".to_string()),
}
}
async fn fetch_available_indexes(
&self,
base_url: &str,
config: &QuickwitConfig,
) -> Vec<QuickwitIndexInfo> {
let url = format!("{}/api/v1/indexes", base_url);
log::debug!("Fetching available Quickwit indexes from: {}", url);
let mut request = self.client.get(&url);
request = self.add_auth_header(request, config);
match request.send().await {
Ok(response) => {
if response.status().is_success() {
match response.json::<Vec<serde_json::Value>>().await {
Ok(indexes) => {
let available: Vec<QuickwitIndexInfo> = indexes
.into_iter()
.filter_map(|idx| {
let index_id = idx
.get("index_config")
.and_then(|c| c.get("index_id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())?;
Some(QuickwitIndexInfo { index_id })
})
.collect();
log::info!(
"Discovered {} Quickwit indexes: {:?}",
available.len(),
available.iter().map(|i| &i.index_id).collect::<Vec<_>>()
);
available
}
Err(e) => {
log::warn!("Failed to parse Quickwit indexes response: {}", e);
Vec::new()
}
}
} else {
log::warn!(
"Failed to fetch Quickwit indexes, status: {}",
response.status()
);
Vec::new()
}
}
Err(e) => {
log::warn!("Failed to connect to Quickwit for index discovery: {}", e);
Vec::new()
}
}
}
fn add_auth_header(
&self,
request: reqwest::RequestBuilder,
config: &QuickwitConfig,
) -> reqwest::RequestBuilder {
if let Some(token) = &config.auth_token {
return request.header("Authorization", token);
}
if let (Some(username), Some(password)) = (&config.auth_username, &config.auth_password) {
return request.basic_auth(username, Some(password));
}
request
}
fn filter_indexes(
&self,
indexes: Vec<QuickwitIndexInfo>,
pattern: &str,
) -> Vec<QuickwitIndexInfo> {
if !pattern.contains('*') {
return indexes
.into_iter()
.filter(|idx| idx.index_id == pattern)
.collect();
}
let filtered: Vec<QuickwitIndexInfo> = indexes
.into_iter()
.filter(|idx| self.matches_glob(&idx.index_id, pattern))
.collect();
log::debug!(
"Filtered indexes with pattern '{}': {} matches",
pattern,
filtered.len()
);
filtered
}
fn matches_glob(&self, text: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
if !prefix.contains('*') {
return text.starts_with(prefix);
}
}
if let Some(suffix) = pattern.strip_prefix('*') {
if !suffix.contains('*') {
return text.ends_with(suffix);
}
}
if pattern.starts_with('*') && pattern.ends_with('*') {
let middle = &pattern[1..pattern.len() - 1];
if !middle.contains('*') {
return text.contains(middle);
}
}
text.contains(pattern.trim_matches('*'))
}
async fn search_single_index(
&self,
needle: &str,
index: &str,
base_url: &str,
config: &QuickwitConfig,
) -> Result<Index> {
let url = self.build_search_url(base_url, index, needle, config);
log::debug!("Searching Quickwit index '{}': {}", index, url);
let mut request = self.client.get(&url);
request = self.add_auth_header(request, config);
request = request.timeout(Duration::from_secs(config.timeout_seconds));
match request.send().await {
Ok(response) => {
if response.status().is_success() {
match response.json::<QuickwitSearchResponse>().await {
Ok(search_response) => {
log::info!(
"Quickwit index '{}' returned {} hits in {}µs",
index,
search_response.num_hits,
search_response.elapsed_time_micros
);
let mut result_index = Index::new();
for (idx, hit) in search_response.hits.iter().enumerate() {
if let Some(doc) = self.hit_to_document(hit, index, base_url, idx) {
result_index.insert(doc.id.clone(), doc);
}
}
Ok(result_index)
}
Err(e) => {
log::warn!(
"Failed to parse Quickwit search response for index '{}': {}",
index,
e
);
Ok(Index::new())
}
}
} else {
log::warn!(
"Quickwit search failed for index '{}' with status: {}",
index,
response.status()
);
Ok(Index::new())
}
}
Err(e) => {
log::warn!("Failed to connect to Quickwit for index '{}': {}", index, e);
Ok(Index::new())
}
}
}
fn build_search_url(
&self,
base_url: &str,
index: &str,
query: &str,
config: &QuickwitConfig,
) -> String {
let encoded_query = urlencoding::encode(query);
format!(
"{}/api/v1/{}/search?query={}&max_hits={}&sort_by={}",
base_url.trim_end_matches('/'),
index,
encoded_query,
config.max_hits,
config.sort_by
)
}
fn hit_to_document(
&self,
hit: &serde_json::Value,
index_name: &str,
base_url: &str,
hit_index: usize,
) -> Option<terraphim_types::Document> {
let timestamp_str = hit.get("timestamp").and_then(|v| v.as_str()).unwrap_or("");
let level = hit.get("level").and_then(|v| v.as_str()).unwrap_or("INFO");
let message = hit.get("message").and_then(|v| v.as_str()).unwrap_or("");
let service = hit.get("service").and_then(|v| v.as_str()).unwrap_or("");
let quickwit_doc_id = hit
.get("_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("{}", hit_index));
let doc_id = self.normalize_document_id(index_name, &quickwit_doc_id);
let title = if !message.is_empty() {
let truncated_msg = if message.len() > 100 {
format!("{}...", &message[..100])
} else {
message.to_string()
};
format!("[{}] {}", level, truncated_msg)
} else {
format!("[{}] {} - {}", index_name, level, timestamp_str)
};
let description = if !message.is_empty() {
let truncated_msg = if message.len() > 200 {
format!("{}...", &message[..200])
} else {
message.to_string()
};
format!("{} - {}", timestamp_str, truncated_msg)
} else {
format!("{} - {} log entry", timestamp_str, level)
};
let body = serde_json::to_string_pretty(hit).unwrap_or_else(|_| "{}".to_string());
let url = format!("{}/api/v1/{}/doc/{}", base_url, index_name, quickwit_doc_id);
let rank = self.parse_timestamp_to_rank(timestamp_str);
let mut tags = vec!["quickwit".to_string(), "logs".to_string()];
if !level.is_empty() && level != "INFO" {
tags.push(level.to_string());
}
if !service.is_empty() {
tags.push(service.to_string());
}
Some(terraphim_types::Document {
id: doc_id,
title,
body,
url,
description: Some(description),
summarization: None,
stub: None,
tags: Some(tags),
rank,
source_haystack: Some(base_url.to_string()),
doc_type: terraphim_types::DocumentType::KgEntry,
synonyms: None,
route: None,
priority: None,
})
}
fn normalize_document_id(&self, index_name: &str, doc_id: &str) -> String {
let original_id = format!("quickwit_{}_{}", index_name, doc_id);
let dummy_doc = terraphim_types::Document {
id: "dummy".to_string(),
title: "dummy".to_string(),
body: "dummy".to_string(),
url: "dummy".to_string(),
description: None,
summarization: None,
stub: None,
tags: None,
rank: None,
source_haystack: None,
doc_type: terraphim_types::DocumentType::KgEntry,
synonyms: None,
route: None,
priority: None,
};
dummy_doc.normalize_key(&original_id)
}
fn parse_timestamp_to_rank(&self, timestamp_str: &str) -> Option<u64> {
if timestamp_str.is_empty() {
return None;
}
let cleaned = timestamp_str
.chars()
.filter(|c| c.is_ascii_digit())
.collect::<String>();
let sortable = cleaned.chars().take(14).collect::<String>();
sortable.parse::<u64>().ok()
}
#[allow(dead_code)]
fn redact_token(&self, token: &str) -> String {
if token.len() <= 4 {
"***".to_string()
} else {
format!("{}...", &token[..4])
}
}
}
impl IndexMiddleware for QuickwitHaystackIndexer {
fn index(
&self,
needle: &str,
haystack: &Haystack,
) -> impl std::future::Future<Output = Result<Index>> + Send {
let needle = needle.to_string();
let haystack = haystack.clone();
let client = self.client.clone();
async move {
let indexer = QuickwitHaystackIndexer { client };
log::info!(
"QuickwitHaystackIndexer::index called for '{}' at {}",
needle,
haystack.location
);
let config = indexer.parse_config(&haystack);
let base_url = &haystack.location;
let indexes_to_search: Vec<String> =
if let Some(ref explicit_index) = config.default_index {
log::info!("Using explicit index: {}", explicit_index);
vec![explicit_index.clone()]
} else {
log::info!("Auto-discovering Quickwit indexes from {}", base_url);
let discovered = indexer.fetch_available_indexes(base_url, &config).await;
if discovered.is_empty() {
log::warn!("No indexes discovered from Quickwit at {}", base_url);
return Ok(Index::new());
}
let filtered = if let Some(ref pattern) = config.index_filter {
log::info!("Applying index filter pattern: {}", pattern);
indexer.filter_indexes(discovered, pattern)
} else {
discovered
};
if filtered.is_empty() {
log::warn!("No indexes match filter pattern: {:?}", config.index_filter);
return Ok(Index::new());
}
log::info!(
"Searching {} indexes: {:?}",
filtered.len(),
filtered.iter().map(|i| &i.index_id).collect::<Vec<_>>()
);
filtered.into_iter().map(|i| i.index_id).collect()
};
let mut merged_index = Index::new();
for index_name in &indexes_to_search {
match indexer
.search_single_index(&needle, index_name, base_url, &config)
.await
{
Ok(index_result) => {
log::debug!(
"Index '{}' returned {} documents",
index_name,
index_result.len()
);
merged_index.extend(index_result);
}
Err(e) => {
log::warn!("Error searching index '{}': {}", index_name, e);
}
}
}
log::info!(
"QuickwitHaystackIndexer completed: {} total documents from {} indexes",
merged_index.len(),
indexes_to_search.len()
);
Ok(merged_index)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[tokio::test]
async fn test_quickwit_indexer_initialization() {
let indexer = QuickwitHaystackIndexer::default();
assert!(std::mem::size_of_val(&indexer.client) > 0);
}
#[tokio::test]
async fn test_graceful_degradation_no_server() {
let indexer = QuickwitHaystackIndexer::default();
let haystack = Haystack {
location: "http://localhost:59999".to_string(),
service: terraphim_config::ServiceType::Quickwit,
read_only: true,
fetch_content: false,
atomic_server_secret: None,
extra_parameters: HashMap::new(),
};
let result = indexer.index("test", &haystack).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
}
#[test]
fn test_parse_config_with_all_parameters() {
let indexer = QuickwitHaystackIndexer::default();
let mut extra_params = HashMap::new();
extra_params.insert("auth_token".to_string(), "Bearer token123".to_string());
extra_params.insert("default_index".to_string(), "workers-logs".to_string());
extra_params.insert("max_hits".to_string(), "50".to_string());
extra_params.insert("timeout_seconds".to_string(), "15".to_string());
extra_params.insert("sort_by".to_string(), "-level".to_string());
let haystack = Haystack {
location: "http://localhost:7280".to_string(),
service: terraphim_config::ServiceType::Quickwit,
read_only: true,
fetch_content: false,
atomic_server_secret: None,
extra_parameters: extra_params,
};
let config = indexer.parse_config(&haystack);
assert_eq!(config.auth_token, Some("Bearer token123".to_string()));
assert_eq!(config.default_index, Some("workers-logs".to_string()));
assert_eq!(config.max_hits, 50);
assert_eq!(config.timeout_seconds, 15);
assert_eq!(config.sort_by, "-level");
}
#[test]
fn test_parse_config_with_defaults() {
let indexer = QuickwitHaystackIndexer::default();
let haystack = Haystack {
location: "http://localhost:7280".to_string(),
service: terraphim_config::ServiceType::Quickwit,
read_only: true,
fetch_content: false,
atomic_server_secret: None,
extra_parameters: HashMap::new(),
};
let config = indexer.parse_config(&haystack);
assert_eq!(config.auth_token, None);
assert_eq!(config.default_index, None);
assert_eq!(config.max_hits, 100); assert_eq!(config.timeout_seconds, 10); assert_eq!(config.sort_by, "-timestamp"); }
#[test]
fn test_parse_config_with_basic_auth() {
let indexer = QuickwitHaystackIndexer::default();
let mut extra_params = HashMap::new();
extra_params.insert("auth_username".to_string(), "cloudflare".to_string());
extra_params.insert("auth_password".to_string(), "secret123".to_string());
extra_params.insert("index_filter".to_string(), "workers-*".to_string());
let haystack = Haystack {
location: "https://logs.terraphim.cloud/api".to_string(),
service: terraphim_config::ServiceType::Quickwit,
read_only: true,
fetch_content: false,
atomic_server_secret: None,
extra_parameters: extra_params,
};
let config = indexer.parse_config(&haystack);
assert_eq!(config.auth_username, Some("cloudflare".to_string()));
assert_eq!(config.auth_password, Some("secret123".to_string()));
assert_eq!(config.index_filter, Some("workers-*".to_string()));
assert_eq!(config.auth_token, None); }
#[test]
fn test_parse_config_with_invalid_numbers() {
let indexer = QuickwitHaystackIndexer::default();
let mut extra_params = HashMap::new();
extra_params.insert("max_hits".to_string(), "invalid".to_string());
extra_params.insert("timeout_seconds".to_string(), "not-a-number".to_string());
let haystack = Haystack {
location: "http://localhost:7280".to_string(),
service: terraphim_config::ServiceType::Quickwit,
read_only: true,
fetch_content: false,
atomic_server_secret: None,
extra_parameters: extra_params,
};
let config = indexer.parse_config(&haystack);
assert_eq!(config.max_hits, 100);
assert_eq!(config.timeout_seconds, 10);
}
#[tokio::test]
#[ignore] async fn test_fetch_available_indexes_live() {
let quickwit_url =
std::env::var("QUICKWIT_URL").unwrap_or_else(|_| "http://localhost:7280".to_string());
let indexer = QuickwitHaystackIndexer::default();
let config = QuickwitConfig {
auth_token: None,
auth_username: None,
auth_password: None,
default_index: None,
index_filter: None,
max_hits: 100,
timeout_seconds: 10,
sort_by: "-timestamp".to_string(),
};
let indexes = indexer
.fetch_available_indexes(&quickwit_url, &config)
.await;
println!("Discovered {} indexes", indexes.len());
for idx in &indexes {
println!(" - {}", idx.index_id);
}
}
#[test]
fn test_auth_header_with_bearer_token() {
let indexer = QuickwitHaystackIndexer::default();
let config = QuickwitConfig {
auth_token: Some("Bearer xyz123".to_string()),
auth_username: None,
auth_password: None,
default_index: None,
index_filter: None,
max_hits: 100,
timeout_seconds: 10,
sort_by: "-timestamp".to_string(),
};
let request = indexer.client.get("http://localhost/test");
let _request_with_auth = indexer.add_auth_header(request, &config);
assert!(config.auth_token.is_some());
}
#[test]
fn test_auth_header_with_basic_auth() {
let indexer = QuickwitHaystackIndexer::default();
let config = QuickwitConfig {
auth_token: None,
auth_username: Some("cloudflare".to_string()),
auth_password: Some("secret123".to_string()),
default_index: None,
index_filter: None,
max_hits: 100,
timeout_seconds: 10,
sort_by: "-timestamp".to_string(),
};
let request = indexer.client.get("http://localhost/test");
let _request_with_auth = indexer.add_auth_header(request, &config);
assert!(config.auth_username.is_some());
assert!(config.auth_password.is_some());
}
#[test]
fn test_auth_header_priority() {
let indexer = QuickwitHaystackIndexer::default();
let config = QuickwitConfig {
auth_token: Some("Bearer xyz123".to_string()),
auth_username: Some("user".to_string()),
auth_password: Some("pass".to_string()),
default_index: None,
index_filter: None,
max_hits: 100,
timeout_seconds: 10,
sort_by: "-timestamp".to_string(),
};
let request = indexer.client.get("http://localhost/test");
let _request_with_auth = indexer.add_auth_header(request, &config);
assert!(config.auth_token.is_some());
}
#[test]
fn test_filter_indexes_exact_match() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "cadro-service-layer".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes, "workers-logs");
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].index_id, "workers-logs");
}
#[test]
fn test_filter_indexes_prefix_pattern() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "workers-metrics".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes, "workers-*");
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().any(|i| i.index_id == "workers-logs"));
assert!(filtered.iter().any(|i| i.index_id == "workers-metrics"));
}
#[test]
fn test_filter_indexes_suffix_pattern() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "service-metrics".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes, "*-logs");
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().any(|i| i.index_id == "workers-logs"));
assert!(filtered.iter().any(|i| i.index_id == "api-logs"));
}
#[test]
fn test_filter_indexes_contains_pattern() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs-prod".to_string(),
},
QuickwitIndexInfo {
index_id: "service-metrics".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes, "*logs*");
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().any(|i| i.index_id == "workers-logs"));
assert!(filtered.iter().any(|i| i.index_id == "api-logs-prod"));
}
#[test]
fn test_filter_indexes_wildcard_all() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes.clone(), "*");
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_filter_indexes_no_matches() {
let indexer = QuickwitHaystackIndexer::default();
let indexes = vec![
QuickwitIndexInfo {
index_id: "workers-logs".to_string(),
},
QuickwitIndexInfo {
index_id: "api-logs".to_string(),
},
];
let filtered = indexer.filter_indexes(indexes, "nonexistent-*");
assert_eq!(filtered.len(), 0);
}
}