use anyhow::Result;
use chrono::{DateTime, Duration, Local, NaiveTime, Utc};
use rusqlite::{params, Connection};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::ai::{elevenlabs::ElevenLabsClient, speech::condense_for_speech};
use crate::config::Config;
#[derive(Clone)]
pub struct VoiceQueue {
tx: mpsc::UnboundedSender<SpeakRequest>,
}
#[derive(Debug, Clone)]
pub struct SpeakRequest {
pub text: String,
pub source: String,
}
impl VoiceQueue {
pub fn enqueue(&self, req: SpeakRequest) {
let _ = self.tx.send(req);
}
pub fn spawn(cfg: &Config) -> Option<Self> {
if !cfg.tts.enabled {
tracing::info!("[voice] TTS off — voice queue 비활성");
return None;
}
let key = cfg.tts.api_key.clone()?;
let voice_id = cfg.tts.voice_id.clone()?;
let quiet_hours = cfg.tts.quiet_hours.clone();
let client = match ElevenLabsClient::new(key) {
Ok(c) => Arc::new(c),
Err(e) => {
tracing::warn!("[voice] ElevenLabs init 실패: {}", e);
return None;
}
};
let (tx, mut rx) = mpsc::unbounded_channel::<SpeakRequest>();
let voice_id_arc = Arc::new(voice_id);
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
if in_quiet_hours(quiet_hours.as_deref()) {
tracing::debug!("[voice] quiet hours — skip: {}", req.source);
continue;
}
tracing::info!("[voice] speak ({}): {}", req.source, req.text);
if let Err(e) = client.speak(&req.text, &voice_id_arc).await {
tracing::warn!("[voice] speak failed ({}): {}", req.source, e);
}
}
});
Some(Self { tx })
}
}
pub fn in_quiet_hours(spec: Option<&str>) -> bool {
let Some(s) = spec else {
return false;
};
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return false;
}
let Ok(start) = NaiveTime::parse_from_str(parts[0].trim(), "%H:%M") else {
return false;
};
let Ok(end) = NaiveTime::parse_from_str(parts[1].trim(), "%H:%M") else {
return false;
};
let now = Local::now().time();
if start <= end {
now >= start && now < end
} else {
now >= start || now < end
}
}
pub fn already_greeted_today(conn: &Connection, user_id: &str) -> Result<bool> {
let mut stmt = conn.prepare(
"SELECT created_at FROM events
WHERE user_id = ?1 AND event_type = 'voice.greeting'
ORDER BY created_at DESC LIMIT 1",
)?;
let last: Option<String> = stmt.query_row(params![user_id], |r| r.get(0)).ok();
let Some(last_str) = last else {
return Ok(false);
};
let Ok(last_dt) = DateTime::parse_from_rfc3339(&last_str) else {
return Ok(false);
};
let last_local = last_dt.with_timezone(&Local).date_naive();
let today = Local::now().date_naive();
Ok(last_local == today)
}
pub fn mark_greeted(conn: &Connection, user_id: &str, body: &str) -> Result<()> {
crate::db::event::insert(
conn,
crate::db::event::EventInput {
user_id: user_id.into(),
project: "_".into(),
event_type: "voice.greeting".into(),
path: None,
payload: serde_json::json!({"text": body}),
},
)?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct ProjectInsight {
pub project: String,
pub idle_days: i64,
pub pending_advice: usize,
pub last_prompt_preview: Option<String>,
}
pub fn survey_projects(conn: &Connection, user_id: &str) -> Result<Vec<ProjectInsight>> {
let projects = crate::db::project::list(conn, user_id)?;
let now = Utc::now();
let mut out = Vec::with_capacity(projects.len());
for p in projects {
let mut stmt = conn.prepare(
"SELECT created_at, json_extract(payload, '$.prompt') FROM events
WHERE user_id = ?1 AND project = ?2
AND event_type = 'hook.user_prompt'
ORDER BY created_at DESC LIMIT 1",
)?;
let last: Option<(String, Option<String>)> = stmt
.query_row(params![user_id, &p.name], |r| Ok((r.get(0)?, r.get(1)?)))
.ok();
let (idle_days, last_prompt) = match last {
Some((ts, prompt)) => {
let dt = DateTime::parse_from_rfc3339(&ts)
.map(|d| d.with_timezone(&Utc))
.unwrap_or(now);
let days = (now - dt).num_days();
(days, prompt)
}
None => (-1, None), };
let pending_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM advice
WHERE user_id = ?1 AND project = ?2 AND confirmed_at IS NULL",
params![user_id, &p.name],
|r| r.get(0),
)
.unwrap_or(0);
out.push(ProjectInsight {
project: p.name,
idle_days,
pending_advice: pending_count as usize,
last_prompt_preview: last_prompt.map(|p| {
let s: String = p.chars().take(50).collect();
if p.chars().count() > 50 {
format!("{}…", s)
} else {
s
}
}),
});
}
Ok(out)
}
pub fn compose_greeting(insights: &[ProjectInsight]) -> Option<String> {
let stalled: Vec<&ProjectInsight> = insights
.iter()
.filter(|i| i.idle_days >= 3 && i.last_prompt_preview.is_some())
.collect();
let total_pending: usize = insights.iter().map(|i| i.pending_advice).sum();
if stalled.is_empty() && total_pending == 0 {
return None;
}
let mut s = String::from("안녕하세요. ");
if total_pending > 0 {
s.push_str(&format!(
"보류 중인 advice 가 {}건 있습니다. ",
total_pending
));
}
for st in stalled.iter().take(2) {
let preview = st.last_prompt_preview.as_deref().unwrap_or("");
s.push_str(&format!(
"{} 가 {}일째 멈춰있어요. 마지막에 {}. ",
st.project, st.idle_days, preview
));
}
if stalled.len() > 2 {
s.push_str(&format!("외 {}개 프로젝트도 정체 중. ", stalled.len() - 2));
}
Some(s)
}
pub fn try_daily_greeting(
conn: &Connection,
user_id: &str,
queue: &VoiceQueue,
quiet_hours: Option<&str>,
) -> Result<()> {
if already_greeted_today(conn, user_id)? {
return Ok(());
}
if in_quiet_hours(quiet_hours) {
return Ok(());
}
let insights = survey_projects(conn, user_id)?;
let Some(text) = compose_greeting(&insights) else {
crate::db::event::insert(
conn,
crate::db::event::EventInput {
user_id: user_id.into(),
project: "_".into(),
event_type: "voice.greeting".into(),
path: None,
payload: serde_json::json!({"text": null, "skipped": "all_clear"}),
},
)?;
return Ok(());
};
queue.enqueue(SpeakRequest {
text: text.clone(),
source: "daily_greeting".into(),
});
mark_greeted(conn, user_id, &text)?;
Ok(())
}
pub fn try_speak_new_advice(
conn: &Connection,
user_id: &str,
queue: &VoiceQueue,
limit: usize,
) -> Result<usize> {
let pending = crate::db::advice::list_pending(conn, user_id, None, limit)?;
let mut spoken = 0usize;
for adv in pending {
if already_spoken(conn, user_id, &adv.id)? {
continue;
}
let text = condense_for_speech(&adv.text, &adv.severity, &adv.project);
queue.enqueue(SpeakRequest {
text,
source: format!("advice:{}", &adv.id[..8.min(adv.id.len())]),
});
crate::db::event::insert(
conn,
crate::db::event::EventInput {
user_id: user_id.into(),
project: adv.project.clone(),
event_type: "voice.spoken".into(),
path: None,
payload: serde_json::json!({"advice_id": adv.id}),
},
)?;
spoken += 1;
}
Ok(spoken)
}
fn already_spoken(conn: &Connection, user_id: &str, advice_id: &str) -> Result<bool> {
let mut stmt = conn.prepare(
"SELECT 1 FROM events
WHERE user_id = ?1 AND event_type = 'voice.spoken'
AND json_extract(payload, '$.advice_id') = ?2
LIMIT 1",
)?;
Ok(stmt
.query_row(params![user_id, advice_id], |_| Ok(()))
.is_ok())
}
pub fn preview_greeting(conn: &Connection, user_id: &str) -> Result<Option<String>> {
let insights = survey_projects(conn, user_id)?;
Ok(compose_greeting(&insights))
}
pub fn last_greeting_time(conn: &Connection, user_id: &str) -> Result<Option<String>> {
let mut stmt = conn.prepare(
"SELECT created_at FROM events
WHERE user_id = ?1 AND event_type = 'voice.greeting'
ORDER BY created_at DESC LIMIT 1",
)?;
Ok(stmt.query_row(params![user_id], |r| r.get(0)).ok())
}
#[allow(dead_code)]
fn _duration_marker(_: Duration) {}