use axum::{
extract::{Multipart, Path, Query, State},
http::StatusCode,
Json,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use uuid::Uuid;
use crate::{
error::{ApiError, ApiResult},
services::SpeciesInfo,
AppContext, ProcessingEvent, ProcessingStatus,
};
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct Recording {
#[schema(example = "550e8400-e29b-41d4-a716-446655440000")]
pub id: Uuid,
#[schema(example = "dawn_chorus_2024.wav")]
pub filename: String,
#[schema(example = 120.5)]
pub duration_secs: f64,
#[schema(example = 44100)]
pub sample_rate: u32,
#[schema(example = 1)]
pub channels: u16,
pub status: ProcessingStatus,
#[schema(example = 42)]
pub segment_count: usize,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct UploadResponse {
pub recording: Recording,
#[schema(example = "/ws/recordings/550e8400-e29b-41d4-a716-446655440000")]
pub status_url: String,
}
#[derive(Debug, Deserialize, IntoParams, ToSchema)]
pub struct NeighborParams {
#[param(default = 10, minimum = 1, maximum = 100)]
pub k: Option<usize>,
#[param(default = 0.0, minimum = 0.0, maximum = 1.0)]
pub min_similarity: Option<f32>,
pub species: Option<String>,
#[param(default = false)]
pub include_audio: Option<bool>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct Neighbor {
pub segment_id: Uuid,
pub recording_id: Uuid,
#[schema(example = 0.95)]
pub similarity: f32,
#[schema(example = 0.123)]
pub distance: f32,
#[schema(example = 12.5)]
pub start_time: f64,
#[schema(example = 14.2)]
pub end_time: f64,
pub species: Option<SpeciesInfo>,
pub audio_url: Option<String>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct Cluster {
pub id: Uuid,
pub label: Option<String>,
#[schema(example = 156)]
pub size: usize,
pub centroid: Vec<f32>,
#[schema(example = 0.87)]
pub density: f32,
pub exemplar_ids: Vec<Uuid>,
pub species_distribution: Vec<SpeciesCount>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct SpeciesCount {
#[schema(example = "American Robin")]
pub name: String,
#[schema(example = "Turdus migratorius")]
pub scientific_name: Option<String>,
#[schema(example = 42)]
pub count: usize,
#[schema(example = 27.3)]
pub percentage: f64,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct EvidencePack {
pub query_id: Uuid,
pub query_segment: SegmentSummary,
pub neighbors: Vec<NeighborEvidence>,
pub shared_features: Vec<AcousticFeature>,
pub visualizations: EvidenceVisualizations,
pub generated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct SegmentSummary {
pub id: Uuid,
pub recording_id: Uuid,
pub start_time: f64,
pub end_time: f64,
pub species: Option<SpeciesInfo>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct NeighborEvidence {
pub segment: SegmentSummary,
pub similarity: f32,
pub contributing_features: Vec<FeatureContribution>,
pub spectrogram_comparison_url: Option<String>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct FeatureContribution {
#[schema(example = "fundamental_frequency")]
pub name: String,
#[schema(example = 0.23)]
pub weight: f32,
pub query_value: f64,
pub neighbor_value: f64,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct AcousticFeature {
#[schema(example = "frequency_modulation")]
pub name: String,
#[schema(example = "Rapid upward sweep in 200-400ms")]
pub description: String,
#[schema(example = 0.92)]
pub confidence: f32,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct EvidenceVisualizations {
pub umap_url: Option<String>,
pub spectrogram_grid_url: Option<String>,
pub feature_importance_url: Option<String>,
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct SearchQuery {
#[schema(example = "ascending whistle followed by trill")]
pub query: Option<String>,
pub segment_id: Option<Uuid>,
pub embedding: Option<Vec<f32>>,
#[schema(default = 20, minimum = 1, maximum = 200)]
pub limit: Option<usize>,
pub species_filter: Option<Vec<String>>,
pub time_start: Option<DateTime<Utc>>,
pub time_end: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct SearchResults {
pub query: SearchQueryEcho,
pub results: Vec<SearchResult>,
pub total_count: usize,
pub latency_ms: u64,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct SearchQueryEcho {
pub text: Option<String>,
pub segment_id: Option<Uuid>,
pub limit: usize,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct SearchResult {
pub segment: SegmentSummary,
#[schema(example = 0.87)]
pub score: f32,
pub highlight: Option<String>,
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct AssignLabelRequest {
#[schema(example = "Northern Cardinal song type A")]
pub label: String,
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct GenerateEvidenceRequest {
pub segment_id: Uuid,
#[schema(default = 10)]
pub k: Option<usize>,
#[schema(default = true)]
pub include_spectrograms: Option<bool>,
}
#[utoipa::path(
post,
path = "/recordings",
request_body(content = Vec<u8>, content_type = "multipart/form-data"),
responses(
(status = 201, description = "Recording uploaded and processing started", body = UploadResponse),
(status = 400, description = "Invalid audio file", body = crate::error::ErrorResponse),
(status = 413, description = "File too large", body = crate::error::ErrorResponse),
(status = 415, description = "Unsupported audio format", body = crate::error::ErrorResponse),
),
tag = "recordings"
)]
pub async fn upload_recording(
State(ctx): State<AppContext>,
mut multipart: Multipart,
) -> ApiResult<(StatusCode, Json<UploadResponse>)> {
let mut audio_data: Option<Vec<u8>> = None;
let mut filename: Option<String> = None;
let mut content_type: Option<String> = None;
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| ApiError::BadRequest(format!("Invalid multipart data: {e}")))?
{
let field_name = field.name().unwrap_or_default().to_string();
if field_name == "file" || field_name == "audio" {
filename = field.file_name().map(String::from);
content_type = field.content_type().map(String::from);
let data = field
.bytes()
.await
.map_err(|e| ApiError::BadRequest(format!("Failed to read file: {e}")))?;
if data.len() > ctx.config.max_upload_size {
return Err(ApiError::PayloadTooLarge(format!(
"File size {} exceeds maximum {}",
data.len(),
ctx.config.max_upload_size
)));
}
audio_data = Some(data.to_vec());
}
}
let audio_data =
audio_data.ok_or_else(|| ApiError::BadRequest("No audio file provided".into()))?;
let filename = filename.unwrap_or_else(|| "unknown.wav".to_string());
let valid_types = ["audio/wav", "audio/x-wav", "audio/flac", "audio/mpeg", "audio/ogg"];
if let Some(ref ct) = content_type {
if !valid_types.iter().any(|t| ct.contains(t)) {
tracing::warn!(content_type = %ct, "Unknown content type, proceeding anyway");
}
}
let recording_id = Uuid::new_v4();
let now = Utc::now();
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Queued,
progress: 0.0,
message: Some("Recording queued for processing".into()),
});
let ctx_clone = ctx.clone();
let audio_data_clone = audio_data.clone();
tokio::spawn(async move {
process_recording(ctx_clone, recording_id, audio_data_clone).await;
});
let (duration, sample_rate, channels) = ctx.audio_pipeline.get_metadata(&audio_data)?;
let recording = Recording {
id: recording_id,
filename,
duration_secs: duration,
sample_rate,
channels,
status: ProcessingStatus::Queued,
segment_count: 0,
created_at: now,
updated_at: now,
};
let response = UploadResponse {
recording: recording.clone(),
status_url: format!("/ws/recordings/{recording_id}"),
};
Ok((StatusCode::CREATED, Json(response)))
}
async fn process_recording(ctx: AppContext, recording_id: Uuid, audio_data: Vec<u8>) {
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Loading,
progress: 0.1,
message: Some("Loading audio file".into()),
});
let audio = match ctx.audio_pipeline.load_audio(&audio_data) {
Ok(a) => a,
Err(e) => {
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Failed,
progress: 0.0,
message: Some(format!("Failed to load audio: {e}")),
});
return;
}
};
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Segmenting,
progress: 0.3,
message: Some("Detecting call segments".into()),
});
let segments = match ctx.audio_pipeline.segment(&audio) {
Ok(s) => s,
Err(e) => {
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Failed,
progress: 0.0,
message: Some(format!("Segmentation failed: {e}")),
});
return;
}
};
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Embedding,
progress: 0.5,
message: Some(format!("Generating embeddings for {} segments", segments.len())),
});
let embeddings = match ctx.embedding_model.embed_batch(&segments).await {
Ok(e) => e,
Err(e) => {
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Failed,
progress: 0.0,
message: Some(format!("Embedding failed: {e}")),
});
return;
}
};
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Indexing,
progress: 0.7,
message: Some("Adding to vector index".into()),
});
if let Err(e) = ctx.vector_index.add_batch(&embeddings) {
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Failed,
progress: 0.0,
message: Some(format!("Indexing failed: {e}")),
});
return;
}
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Analyzing,
progress: 0.9,
message: Some("Running cluster analysis".into()),
});
if let Err(e) = ctx.cluster_engine.update_clusters(&embeddings) {
tracing::warn!(error = %e, "Cluster update failed, continuing");
}
ctx.publish_event(ProcessingEvent {
recording_id,
status: ProcessingStatus::Complete,
progress: 1.0,
message: Some(format!(
"Processing complete: {} segments indexed",
segments.len()
)),
});
}
#[utoipa::path(
get,
path = "/recordings/{id}",
params(
("id" = Uuid, Path, description = "Recording ID")
),
responses(
(status = 200, description = "Recording found", body = Recording),
(status = 404, description = "Recording not found", body = crate::error::ErrorResponse),
),
tag = "recordings"
)]
pub async fn get_recording(
State(_ctx): State<AppContext>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<Recording>> {
Err(ApiError::not_found("Recording", id))
}
#[utoipa::path(
get,
path = "/segments/{id}/neighbors",
params(
("id" = Uuid, Path, description = "Segment ID"),
NeighborParams
),
responses(
(status = 200, description = "Neighbors found", body = Vec<Neighbor>),
(status = 404, description = "Segment not found", body = crate::error::ErrorResponse),
),
tag = "segments"
)]
pub async fn get_neighbors(
State(ctx): State<AppContext>,
Path(segment_id): Path<Uuid>,
Query(params): Query<NeighborParams>,
) -> ApiResult<Json<Vec<Neighbor>>> {
let k = params.k.unwrap_or(10).min(100);
let min_similarity = params.min_similarity.unwrap_or(0.0);
let embedding = ctx
.vector_index
.get_embedding(&segment_id)?
.ok_or_else(|| ApiError::not_found("Segment", segment_id))?;
let results = ctx.vector_index.search(&embedding, k, min_similarity)?;
let neighbors: Vec<Neighbor> = results
.into_iter()
.filter(|r| r.id != segment_id) .map(|r| Neighbor {
segment_id: r.id,
recording_id: r.recording_id,
similarity: 1.0 - r.distance, distance: r.distance,
start_time: r.start_time,
end_time: r.end_time,
species: r.species,
audio_url: if params.include_audio.unwrap_or(false) {
Some(format!("/api/v1/segments/{}/audio", r.id))
} else {
None
},
})
.collect();
Ok(Json(neighbors))
}
#[utoipa::path(
get,
path = "/clusters",
responses(
(status = 200, description = "Clusters retrieved", body = Vec<Cluster>),
),
tag = "clusters"
)]
pub async fn list_clusters(State(ctx): State<AppContext>) -> ApiResult<Json<Vec<Cluster>>> {
let cluster_data = ctx.cluster_engine.get_all_clusters()?;
let clusters: Vec<Cluster> = cluster_data
.into_iter()
.map(|c| Cluster {
id: c.id,
label: c.label,
size: c.size,
centroid: c.centroid,
density: c.density,
exemplar_ids: c.exemplar_ids,
species_distribution: c
.species_distribution
.into_iter()
.map(|(name, count, percentage)| SpeciesCount {
name: name.clone(),
scientific_name: None, count,
percentage,
})
.collect(),
created_at: c.created_at,
})
.collect();
Ok(Json(clusters))
}
#[utoipa::path(
get,
path = "/clusters/{id}",
params(
("id" = Uuid, Path, description = "Cluster ID")
),
responses(
(status = 200, description = "Cluster found", body = Cluster),
(status = 404, description = "Cluster not found", body = crate::error::ErrorResponse),
),
tag = "clusters"
)]
pub async fn get_cluster(
State(ctx): State<AppContext>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<Cluster>> {
let cluster_data = ctx
.cluster_engine
.get_cluster(&id)?
.ok_or_else(|| ApiError::not_found("Cluster", id))?;
let cluster = Cluster {
id: cluster_data.id,
label: cluster_data.label,
size: cluster_data.size,
centroid: cluster_data.centroid,
density: cluster_data.density,
exemplar_ids: cluster_data.exemplar_ids,
species_distribution: cluster_data
.species_distribution
.into_iter()
.map(|(name, count, percentage)| SpeciesCount {
name,
scientific_name: None,
count,
percentage,
})
.collect(),
created_at: cluster_data.created_at,
};
Ok(Json(cluster))
}
#[utoipa::path(
put,
path = "/clusters/{id}/label",
params(
("id" = Uuid, Path, description = "Cluster ID")
),
request_body = AssignLabelRequest,
responses(
(status = 200, description = "Label assigned", body = Cluster),
(status = 404, description = "Cluster not found", body = crate::error::ErrorResponse),
),
tag = "clusters"
)]
pub async fn assign_cluster_label(
State(ctx): State<AppContext>,
Path(id): Path<Uuid>,
Json(request): Json<AssignLabelRequest>,
) -> ApiResult<Json<Cluster>> {
let cluster_data = ctx
.cluster_engine
.assign_label(&id, &request.label)?
.ok_or_else(|| ApiError::not_found("Cluster", id))?;
let cluster = Cluster {
id: cluster_data.id,
label: cluster_data.label,
size: cluster_data.size,
centroid: cluster_data.centroid,
density: cluster_data.density,
exemplar_ids: cluster_data.exemplar_ids,
species_distribution: cluster_data
.species_distribution
.into_iter()
.map(|(name, count, percentage)| SpeciesCount {
name,
scientific_name: None,
count,
percentage,
})
.collect(),
created_at: cluster_data.created_at,
};
Ok(Json(cluster))
}
#[utoipa::path(
get,
path = "/evidence/{id}",
params(
("id" = String, Path, description = "Evidence pack ID (query UUID)")
),
responses(
(status = 200, description = "Evidence pack retrieved", body = EvidencePack),
(status = 404, description = "Evidence not found", body = crate::error::ErrorResponse),
),
tag = "evidence"
)]
pub async fn get_evidence_pack(
State(ctx): State<AppContext>,
Path(id): Path<String>,
) -> ApiResult<Json<EvidencePack>> {
let query_id = Uuid::parse_str(&id)
.map_err(|_| ApiError::BadRequest(format!("Invalid UUID: {id}")))?;
let evidence = ctx
.interpretation_engine
.get_evidence_pack(&query_id)?
.ok_or_else(|| ApiError::not_found("Evidence pack", id))?;
let pack = EvidencePack {
query_id: evidence.query_id,
query_segment: SegmentSummary {
id: evidence.query_segment.id,
recording_id: evidence.query_segment.recording_id,
start_time: evidence.query_segment.start_time,
end_time: evidence.query_segment.end_time,
species: evidence.query_segment.species,
},
neighbors: evidence
.neighbors
.into_iter()
.map(|n| NeighborEvidence {
segment: SegmentSummary {
id: n.segment.id,
recording_id: n.segment.recording_id,
start_time: n.segment.start_time,
end_time: n.segment.end_time,
species: n.segment.species,
},
similarity: n.similarity,
contributing_features: n
.contributing_features
.into_iter()
.map(|f| FeatureContribution {
name: f.name,
weight: f.weight,
query_value: f.query_value,
neighbor_value: f.neighbor_value,
})
.collect(),
spectrogram_comparison_url: n.spectrogram_comparison_url,
})
.collect(),
shared_features: evidence
.shared_features
.into_iter()
.map(|f| AcousticFeature {
name: f.name,
description: f.description,
confidence: f.confidence,
})
.collect(),
visualizations: EvidenceVisualizations {
umap_url: evidence.visualizations.umap_url,
spectrogram_grid_url: evidence.visualizations.spectrogram_grid_url,
feature_importance_url: evidence.visualizations.feature_importance_url,
},
generated_at: evidence.generated_at,
};
Ok(Json(pack))
}
#[utoipa::path(
post,
path = "/evidence",
request_body = GenerateEvidenceRequest,
responses(
(status = 201, description = "Evidence pack generated", body = EvidencePack),
(status = 404, description = "Segment not found", body = crate::error::ErrorResponse),
),
tag = "evidence"
)]
pub async fn generate_evidence_pack(
State(ctx): State<AppContext>,
Json(request): Json<GenerateEvidenceRequest>,
) -> ApiResult<(StatusCode, Json<EvidencePack>)> {
let embedding = ctx
.vector_index
.get_embedding(&request.segment_id)?
.ok_or_else(|| ApiError::not_found("Segment", request.segment_id))?;
let k = request.k.unwrap_or(10);
let neighbors = ctx.vector_index.search(&embedding, k, 0.0)?;
let evidence = ctx
.interpretation_engine
.generate_evidence_pack(&request.segment_id, &neighbors)
.await?;
let pack = EvidencePack {
query_id: evidence.query_id,
query_segment: SegmentSummary {
id: evidence.query_segment.id,
recording_id: evidence.query_segment.recording_id,
start_time: evidence.query_segment.start_time,
end_time: evidence.query_segment.end_time,
species: evidence.query_segment.species,
},
neighbors: evidence
.neighbors
.into_iter()
.map(|n| NeighborEvidence {
segment: SegmentSummary {
id: n.segment.id,
recording_id: n.segment.recording_id,
start_time: n.segment.start_time,
end_time: n.segment.end_time,
species: n.segment.species,
},
similarity: n.similarity,
contributing_features: n
.contributing_features
.into_iter()
.map(|f| FeatureContribution {
name: f.name,
weight: f.weight,
query_value: f.query_value,
neighbor_value: f.neighbor_value,
})
.collect(),
spectrogram_comparison_url: n.spectrogram_comparison_url,
})
.collect(),
shared_features: evidence
.shared_features
.into_iter()
.map(|f| AcousticFeature {
name: f.name,
description: f.description,
confidence: f.confidence,
})
.collect(),
visualizations: EvidenceVisualizations {
umap_url: evidence.visualizations.umap_url,
spectrogram_grid_url: evidence.visualizations.spectrogram_grid_url,
feature_importance_url: evidence.visualizations.feature_importance_url,
},
generated_at: evidence.generated_at,
};
Ok((StatusCode::CREATED, Json(pack)))
}
#[utoipa::path(
post,
path = "/search",
request_body = SearchQuery,
responses(
(status = 200, description = "Search results", body = SearchResults),
(status = 400, description = "Invalid search query", body = crate::error::ErrorResponse),
),
tag = "search"
)]
pub async fn search(
State(ctx): State<AppContext>,
Json(query): Json<SearchQuery>,
) -> ApiResult<Json<SearchResults>> {
let start = std::time::Instant::now();
if query.query.is_none() && query.segment_id.is_none() && query.embedding.is_none() {
return Err(ApiError::BadRequest(
"Must provide query text, segment_id, or embedding".into(),
));
}
let limit = query.limit.unwrap_or(20).min(200);
let search_embedding = if let Some(ref text) = query.query {
ctx.embedding_model.embed_text(text).await?
} else if let Some(segment_id) = query.segment_id {
ctx.vector_index
.get_embedding(&segment_id)?
.ok_or_else(|| ApiError::not_found("Segment", segment_id))?
} else if let Some(ref embedding) = query.embedding {
embedding.clone()
} else {
unreachable!()
};
let results = ctx.vector_index.search(&search_embedding, limit, 0.0)?;
let latency_ms = start.elapsed().as_millis() as u64;
let search_results = SearchResults {
query: SearchQueryEcho {
text: query.query,
segment_id: query.segment_id,
limit,
},
results: results
.into_iter()
.map(|r| SearchResult {
segment: SegmentSummary {
id: r.id,
recording_id: r.recording_id,
start_time: r.start_time,
end_time: r.end_time,
species: r.species,
},
score: 1.0 - r.distance,
highlight: None,
})
.collect(),
total_count: limit, latency_ms,
};
Ok(Json(search_results))
}
#[utoipa::path(
get,
path = "/health",
responses(
(status = 200, description = "Service healthy", body = crate::HealthResponse),
),
tag = "system"
)]
pub async fn health_check() -> Json<crate::HealthResponse> {
Json(crate::HealthResponse {
status: "healthy".into(),
version: env!("CARGO_PKG_VERSION").into(),
uptime_secs: 0, })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_neighbor_params_defaults() {
let params: NeighborParams = serde_json::from_str("{}").unwrap();
assert!(params.k.is_none());
assert!(params.min_similarity.is_none());
}
#[test]
fn test_search_query_validation() {
let query = SearchQuery {
query: None,
segment_id: None,
embedding: None,
limit: None,
species_filter: None,
time_start: None,
time_end: None,
};
assert!(query.query.is_none() && query.segment_id.is_none() && query.embedding.is_none());
}
}