use super::r#trait::{Tool, ToolCapability, ToolExecutionContext, ToolResult};
use async_trait::async_trait;
use serde_json::Value;
use std::time::Duration;
const GEMINI_BASE_URL: &str = "https://generativelanguage.googleapis.com/v1beta";
const GEMINI_UPLOAD_URL: &str = "https://generativelanguage.googleapis.com/upload/v1beta/files";
const INLINE_MAX_BYTES: u64 = 18 * 1024 * 1024;
const FILES_API_POLL_TIMEOUT: Duration = Duration::from_secs(120);
const FILES_API_POLL_INTERVAL: Duration = Duration::from_secs(2);
const FALLBACK_MAX_FRAMES: usize = 30;
const FALLBACK_FPS: f64 = 1.0;
pub struct AnalyzeVideoTool {
api_key: String,
model: String,
}
impl AnalyzeVideoTool {
pub fn new(api_key: String, model: String) -> Self {
Self { api_key, model }
}
}
#[async_trait]
impl Tool for AnalyzeVideoTool {
fn name(&self) -> &str {
"analyze_video"
}
fn description(&self) -> &str {
"Analyze a video file (local path) using Google Gemini multimodal vision. \
Use when: the user attached a video and you need to understand its content, \
the model needs to describe motion / sequence / spoken audio in a video, or \
a `<<VID:path>>` marker is present in the prompt. Pass `question` to ask \
something specific (e.g. 'transcribe the spoken audio', 'describe each frame \
in chronological order'); defaults to a general detailed description. \
Inline upload for files ≤ 18 MB, otherwise Files API."
}
fn input_schema(&self) -> Value {
serde_json::json!({
"type": "object",
"properties": {
"video": {
"type": "string",
"description": "Local file path to the video (mp4, mov, webm, mkv, avi, 3gp, flv)."
},
"question": {
"type": "string",
"description": "What to ask about the video. Defaults to 'Describe this video in detail — actions, subjects, setting, and any spoken audio in chronological order.'"
}
},
"required": ["video"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::Network, ToolCapability::ReadFiles]
}
fn requires_approval(&self) -> bool {
false
}
async fn execute(
&self,
input: Value,
context: &ToolExecutionContext,
) -> super::error::Result<ToolResult> {
let video_path = match input["video"].as_str() {
Some(s) if !s.is_empty() => s.to_string(),
_ => {
return Ok(ToolResult::error(
"Missing required parameter: video".to_string(),
));
}
};
let question = input["question"]
.as_str()
.unwrap_or(
"Describe this video in detail — actions, subjects, setting, \
and any spoken audio in chronological order.",
)
.to_string();
let metadata = match tokio::fs::metadata(&video_path).await {
Ok(m) => m,
Err(e) => {
return Ok(ToolResult::error(format!(
"Failed to stat video file '{}': {}",
video_path, e
)));
}
};
let size = metadata.len();
let mime_type = detect_video_mime_type(&video_path);
tracing::info!(
"analyze_video: path={} size={} mime={} model={}",
video_path,
size,
mime_type,
self.model,
);
let native_err: String = match self
.try_native_video(&video_path, mime_type, size, &question)
.await
{
Ok(result) if result.success => return Ok(result),
Ok(failed) => failed.error.unwrap_or_else(|| "unknown error".to_string()),
Err(e) => e.to_string(),
};
tracing::warn!(
"analyze_video: native Gemini path failed ({}). Falling back to ffmpeg \
frame extraction + per-frame vision.",
native_err
);
self.frame_extraction_fallback(&video_path, &question, native_err, context)
.await
}
}
impl AnalyzeVideoTool {
async fn try_native_video(
&self,
video_path: &str,
mime_type: &'static str,
size: u64,
question: &str,
) -> super::error::Result<ToolResult> {
let video_part = if size <= INLINE_MAX_BYTES {
self.build_inline_part(video_path, mime_type).await?
} else {
tracing::info!(
"analyze_video: file size {} > {} inline cap — using Files API",
size,
INLINE_MAX_BYTES,
);
self.upload_via_files_api(video_path, mime_type, size)
.await?
};
self.run_generate_content(video_part, question).await
}
async fn frame_extraction_fallback(
&self,
video_path: &str,
question: &str,
native_err: String,
context: &ToolExecutionContext,
) -> super::error::Result<ToolResult> {
if !ffmpeg_available().await {
return Ok(ToolResult::error(format!(
"Video analysis failed. Native Gemini video upload errored ({native_err}) and \
the ffmpeg frame-extraction fallback is unavailable: `ffmpeg` is not installed \
or not on PATH. Install ffmpeg to enable frame-based video analysis."
)));
}
let tmp = tempfile::Builder::new()
.prefix("opencrabs-video-frames-")
.tempdir()
.map_err(|e| {
super::error::ToolError::Execution(format!(
"Failed to create temp dir for frame extraction: {e}"
))
})?;
let pattern = tmp.path().join("frame_%03d.jpg");
let pattern_str = pattern.to_string_lossy().to_string();
let fps_filter = format!("fps={FALLBACK_FPS}");
let output = tokio::process::Command::new("ffmpeg")
.args([
"-hide_banner",
"-loglevel",
"error",
"-i",
video_path,
"-vf",
&fps_filter,
"-frames:v",
&FALLBACK_MAX_FRAMES.to_string(),
"-q:v",
"3",
&pattern_str,
])
.output()
.await
.map_err(|e| {
super::error::ToolError::Execution(format!("Failed to spawn ffmpeg: {e}"))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Ok(ToolResult::error(format!(
"Video analysis failed. Native Gemini video upload errored ({native_err}) and \
ffmpeg frame extraction failed: {}",
stderr.trim()
)));
}
let mut frames: Vec<std::path::PathBuf> = Vec::new();
let mut entries = tokio::fs::read_dir(tmp.path()).await.map_err(|e| {
super::error::ToolError::Execution(format!("Failed to read frame dir: {e}"))
})?;
while let Some(entry) = entries.next_entry().await.map_err(|e| {
super::error::ToolError::Execution(format!("Failed to iterate frame dir: {e}"))
})? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("jpg") {
frames.push(path);
}
}
frames.sort();
if frames.is_empty() {
return Ok(ToolResult::error(format!(
"Video analysis failed. Native Gemini video upload errored ({native_err}) and \
ffmpeg produced no frames (unreadable or zero-length video?)."
)));
}
tracing::info!(
"analyze_video fallback: extracted {} frame(s), analyzing each with Gemini vision",
frames.len()
);
let vision =
super::analyze_image::AnalyzeImageTool::new(self.api_key.clone(), self.model.clone());
let total = frames.len();
let mut sections: Vec<String> = Vec::with_capacity(total);
for (idx, frame) in frames.iter().enumerate() {
let approx_secs = (idx as f64) / FALLBACK_FPS;
let per_frame_q = format!(
"This is frame {} of {} extracted from a video (≈{:.0}s in). Describe \
concisely what is visible and any action or change. The user ultimately \
asked: {}",
idx + 1,
total,
approx_secs,
question
);
let frame_path = frame.to_string_lossy().to_string();
let res = vision
.execute(
serde_json::json!({ "image": frame_path, "question": per_frame_q }),
context,
)
.await;
let desc = match res {
Ok(r) if r.success => r.output.trim().to_string(),
Ok(r) => format!(
"[frame analysis failed: {}]",
r.error.unwrap_or_else(|| "unknown".to_string())
),
Err(e) => format!("[frame analysis failed: {e}]"),
};
sections.push(format!(
"Frame {} (≈{:.0}s): {}",
idx + 1,
approx_secs,
desc
));
}
let body = sections.join("\n\n");
let header = format!(
"[Frame-extraction fallback — native Gemini video upload was unavailable \
({native_err}). Analyzed {total} frame(s) sampled at {FALLBACK_FPS} fps. \
The descriptions below are per-frame, in chronological order.]\n\n"
);
Ok(ToolResult::success(format!("{header}{body}")))
}
async fn build_inline_part(
&self,
path: &str,
mime_type: &'static str,
) -> super::error::Result<Value> {
let bytes = tokio::fs::read(path).await.map_err(|e| {
super::error::ToolError::Execution(format!(
"Failed to read video file '{}': {}",
path, e
))
})?;
let b64 = super::analyze_image::base64_encode(&bytes);
Ok(serde_json::json!({
"inlineData": {
"mimeType": mime_type,
"data": b64
}
}))
}
async fn upload_via_files_api(
&self,
path: &str,
mime_type: &'static str,
size: u64,
) -> super::error::Result<Value> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(600))
.build()
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
let display_name = std::path::Path::new(path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video");
let init_body = serde_json::json!({
"file": { "display_name": display_name }
});
let init_resp = client
.post(GEMINI_UPLOAD_URL)
.header("x-goog-api-key", &self.api_key)
.header("X-Goog-Upload-Protocol", "resumable")
.header("X-Goog-Upload-Command", "start")
.header("X-Goog-Upload-Header-Content-Length", size.to_string())
.header("X-Goog-Upload-Header-Content-Type", mime_type)
.header("Content-Type", "application/json")
.json(&init_body)
.send()
.await
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
if !init_resp.status().is_success() {
let status = init_resp.status();
let body = init_resp.text().await.unwrap_or_default();
return Err(super::error::ToolError::Execution(format!(
"Files API resumable-start failed: HTTP {} — {}",
status, body
)));
}
let upload_url = init_resp
.headers()
.get("x-goog-upload-url")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.ok_or_else(|| {
super::error::ToolError::Execution(
"Files API resumable-start: missing X-Goog-Upload-URL header".to_string(),
)
})?;
let bytes = tokio::fs::read(path).await.map_err(|e| {
super::error::ToolError::Execution(format!(
"Failed to read video file '{}': {}",
path, e
))
})?;
let upload_resp = client
.post(&upload_url)
.header("Content-Length", bytes.len().to_string())
.header("X-Goog-Upload-Offset", "0")
.header("X-Goog-Upload-Command", "upload, finalize")
.body(bytes)
.send()
.await
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
if !upload_resp.status().is_success() {
let status = upload_resp.status();
let body = upload_resp.text().await.unwrap_or_default();
return Err(super::error::ToolError::Execution(format!(
"Files API upload failed: HTTP {} — {}",
status, body
)));
}
let upload_json: Value = upload_resp.json().await.map_err(|e| {
super::error::ToolError::Execution(format!(
"Files API upload: failed to parse JSON response: {}",
e
))
})?;
let file_name = upload_json["file"]["name"]
.as_str()
.ok_or_else(|| {
super::error::ToolError::Execution(
"Files API upload: missing file.name in response".to_string(),
)
})?
.to_string();
let file_uri = upload_json["file"]["uri"]
.as_str()
.ok_or_else(|| {
super::error::ToolError::Execution(
"Files API upload: missing file.uri in response".to_string(),
)
})?
.to_string();
let deadline = std::time::Instant::now() + FILES_API_POLL_TIMEOUT;
loop {
if std::time::Instant::now() >= deadline {
return Err(super::error::ToolError::Execution(format!(
"Files API upload: file '{}' did not reach ACTIVE state within {}s",
file_name,
FILES_API_POLL_TIMEOUT.as_secs()
)));
}
let status_resp = client
.get(format!("{}/{}", GEMINI_BASE_URL, file_name))
.header("x-goog-api-key", &self.api_key)
.send()
.await
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
if !status_resp.status().is_success() {
let status = status_resp.status();
let body = status_resp.text().await.unwrap_or_default();
return Err(super::error::ToolError::Execution(format!(
"Files API state poll failed: HTTP {} — {}",
status, body
)));
}
let status_json: Value = status_resp.json().await.map_err(|e| {
super::error::ToolError::Execution(format!(
"Files API state poll: failed to parse JSON: {}",
e
))
})?;
let state = status_json["state"].as_str().unwrap_or("").to_string();
tracing::debug!("analyze_video: file '{}' state={}", file_name, state);
match state.as_str() {
"ACTIVE" => break,
"FAILED" => {
return Err(super::error::ToolError::Execution(format!(
"Files API: upload '{}' entered FAILED state",
file_name
)));
}
_ => {
tokio::time::sleep(FILES_API_POLL_INTERVAL).await;
}
}
}
Ok(serde_json::json!({
"fileData": {
"mimeType": mime_type,
"fileUri": file_uri
}
}))
}
async fn run_generate_content(
&self,
video_part: Value,
question: &str,
) -> super::error::Result<ToolResult> {
let url = format!("{}/models/{}:generateContent", GEMINI_BASE_URL, self.model);
let body = serde_json::json!({
"contents": [{
"parts": [
video_part,
{ "text": question }
]
}]
});
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(300))
.build()
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
tracing::info!(
"analyze_video: calling Gemini generateContent model={} url={}",
self.model,
url,
);
let response = client
.post(&url)
.header("Content-Type", "application/json")
.header("x-goog-api-key", &self.api_key)
.json(&body)
.send()
.await
.map_err(|e| super::error::ToolError::Execution(e.to_string()))?;
let status = response.status().as_u16();
let body_text = response.text().await.map_err(|e| {
super::error::ToolError::Execution(format!("Failed to read response body: {}", e))
})?;
tracing::info!(
"analyze_video: Gemini HTTP status={} body[..300]={}",
status,
&body_text.chars().take(300).collect::<String>()
);
if !(200..300).contains(&status) {
return Ok(ToolResult::error(format!(
"Gemini API error {}: {}",
status, body_text
)));
}
let json: Value = serde_json::from_str(&body_text).map_err(|e| {
super::error::ToolError::Execution(format!(
"Failed to parse Gemini JSON response: {}. Body[..500]: {}",
e,
&body_text.chars().take(500).collect::<String>()
))
})?;
let empty_vec = vec![];
let candidates = json["candidates"].as_array().unwrap_or(&empty_vec);
let mut result_text = String::new();
for candidate in candidates {
let empty_parts = vec![];
let parts = candidate["content"]["parts"]
.as_array()
.unwrap_or(&empty_parts);
for part in parts {
if let Some(text) = part["text"].as_str() {
result_text.push_str(text);
}
}
}
if result_text.is_empty() {
Ok(ToolResult::error(
"No text response from Gemini video analysis".to_string(),
))
} else {
Ok(ToolResult::success(result_text))
}
}
}
async fn ffmpeg_available() -> bool {
tokio::process::Command::new("ffmpeg")
.arg("-version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await
.map(|s| s.success())
.unwrap_or(false)
}
pub(crate) fn detect_video_mime_type(path: &str) -> &'static str {
let lower = path.to_lowercase();
if lower.ends_with(".mp4") || lower.ends_with(".m4v") {
"video/mp4"
} else if lower.ends_with(".mov") {
"video/quicktime"
} else if lower.ends_with(".webm") {
"video/webm"
} else if lower.ends_with(".mkv") {
"video/x-matroska"
} else if lower.ends_with(".avi") {
"video/x-msvideo"
} else if lower.ends_with(".3gp") {
"video/3gpp"
} else if lower.ends_with(".flv") {
"video/x-flv"
} else {
"video/mp4"
}
}