use crate::clangd::index::ProgressEvent;
use crate::lsp::protocol::JsonRpcNotification;
use lsp_types::{notification::Notification, request::Request};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use tracing::{debug, trace, warn};
#[derive(Debug, Clone, PartialEq)]
pub enum IndexingStatus {
NotStarted,
InProgress {
current: u32,
total: u32,
percentage: u8,
message: Option<String>,
},
Completed,
Failed(String),
}
#[derive(Debug)]
struct IndexingState {
status: IndexingStatus,
progress_token: Option<String>,
}
impl Default for IndexingState {
fn default() -> Self {
Self {
status: IndexingStatus::NotStarted,
progress_token: None,
}
}
}
#[derive(Clone)]
pub struct IndexProgressMonitor {
state: Arc<Mutex<IndexingState>>,
progress_sender: Option<mpsc::Sender<ProgressEvent>>,
}
impl IndexProgressMonitor {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(IndexingState::default())),
progress_sender: None,
}
}
pub fn with_sender(sender: mpsc::Sender<ProgressEvent>) -> Self {
Self {
state: Arc::new(Mutex::new(IndexingState::default())),
progress_sender: Some(sender),
}
}
pub fn create_handler(&self) -> impl Fn(JsonRpcNotification) + Send + Sync + 'static {
let state = Arc::clone(&self.state);
let progress_sender = self.progress_sender.clone();
move |notification| {
let state = Arc::clone(&state);
let progress_sender = progress_sender.clone();
tokio::spawn(async move {
Self::process_notification_internal(notification, state, progress_sender).await;
});
}
}
pub async fn get_progress(&self) -> IndexingStatus {
let state = self.state.lock().await;
state.status.clone()
}
pub async fn reset(&self) {
let mut state = self.state.lock().await;
*state = IndexingState::default();
debug!("IndexProgressMonitor: Reset indexing state");
}
async fn process_notification_internal(
notification: JsonRpcNotification,
state: Arc<Mutex<IndexingState>>,
progress_sender: Option<mpsc::Sender<ProgressEvent>>,
) {
trace!(
"IndexProgressMonitor: Processing notification: {}",
notification.method
);
match notification.method.as_str() {
lsp_types::request::WorkDoneProgressCreate::METHOD => {
Self::handle_progress_create(notification.params, state).await;
}
lsp_types::notification::Progress::METHOD => {
Self::handle_progress_update(notification.params, state, progress_sender).await;
}
_ => {
trace!(
"IndexProgressMonitor: Ignoring notification: {}",
notification.method
);
}
}
}
async fn handle_progress_create(params: Option<Value>, state: Arc<Mutex<IndexingState>>) {
if let Some(params) = params
&& let Some(token) = params.get("token").and_then(|t| t.as_str())
&& token == "backgroundIndexProgress"
{
let mut state = state.lock().await;
state.progress_token = Some(token.to_string());
debug!("IndexProgressMonitor: Tracking progress token: {}", token);
}
}
async fn handle_progress_update(
params: Option<Value>,
state: Arc<Mutex<IndexingState>>,
progress_sender: Option<mpsc::Sender<ProgressEvent>>,
) {
if let Some(params) = params {
let token = params.get("token").and_then(|t| t.as_str());
if token != Some("backgroundIndexProgress") {
return;
}
if let Some(value) = params.get("value") {
let kind = value.get("kind").and_then(|k| k.as_str());
match kind {
Some("begin") => {
let percentage = value
.get("percentage")
.and_then(|p| p.as_u64())
.unwrap_or(0) as u8;
let title = value.get("title").and_then(|t| t.as_str());
let mut state = state.lock().await;
state.status = IndexingStatus::InProgress {
current: 0,
total: 1, percentage,
message: title.map(|s| s.to_string()),
};
debug!(
"IndexProgressMonitor: Indexing started - {}",
title.unwrap_or("indexing")
);
if let Some(ref sender) = progress_sender
&& sender
.try_send(ProgressEvent::OverallIndexingStarted)
.is_err()
{
warn!(
"IndexProgressMonitor: Failed to send OverallIndexingStarted event"
);
}
}
Some("report") => {
let percentage = value
.get("percentage")
.and_then(|p| p.as_u64())
.unwrap_or(0) as u8;
let message = value.get("message").and_then(|m| m.as_str());
let (current, total) = if let Some(msg) = message {
if let Some((c, t)) = parse_progress_message(msg) {
(c, t)
} else {
(0, 1)
}
} else {
(0, 1)
};
let mut state = state.lock().await;
state.status = IndexingStatus::InProgress {
current,
total,
percentage,
message: message.map(|s| s.to_string()),
};
trace!(
"IndexProgressMonitor: Progress update - {}% ({})",
percentage,
message.unwrap_or("")
);
if let Some(ref sender) = progress_sender {
let event = ProgressEvent::OverallProgress {
current,
total,
percentage,
message: message.map(|s| s.to_string()),
};
if sender.try_send(event).is_err() {
warn!("IndexProgressMonitor: Failed to send OverallProgress event");
}
}
}
Some("end") => {
let mut state = state.lock().await;
state.status = IndexingStatus::Completed;
drop(state);
debug!("IndexProgressMonitor: Indexing completed");
if let Some(ref sender) = progress_sender
&& sender.try_send(ProgressEvent::OverallCompleted).is_err()
{
warn!("IndexProgressMonitor: Failed to send OverallCompleted event");
}
}
_ => {
trace!("IndexProgressMonitor: Unknown progress kind: {:?}", kind);
}
}
}
}
}
}
fn parse_progress_message(message: &str) -> Option<(u32, u32)> {
let parts: Vec<&str> = message.split('/').collect();
if parts.len() == 2
&& let (Ok(current), Ok(total)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>())
{
return Some((current, total));
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_parse_progress_message() {
assert_eq!(parse_progress_message("0/7"), Some((0, 7)));
assert_eq!(parse_progress_message("3/7"), Some((3, 7)));
assert_eq!(parse_progress_message("invalid"), None);
assert_eq!(parse_progress_message("1/2/3"), None);
assert_eq!(parse_progress_message("a/b"), None);
}
#[tokio::test]
async fn test_index_progress_monitor_creation() {
let monitor = IndexProgressMonitor::new();
let status = monitor.get_progress().await;
assert_eq!(status, IndexingStatus::NotStarted);
}
#[tokio::test]
async fn test_index_progress_monitor_reset() {
let monitor = IndexProgressMonitor::new();
{
let mut state = monitor.state.lock().await;
state.status = IndexingStatus::InProgress {
current: 1,
total: 5,
percentage: 20,
message: Some("test".to_string()),
};
}
monitor.reset().await;
let status = monitor.get_progress().await;
assert_eq!(status, IndexingStatus::NotStarted);
}
#[tokio::test]
async fn test_notification_handler_creation() {
let monitor = IndexProgressMonitor::new();
let handler = monitor.create_handler();
let notification = JsonRpcNotification {
jsonrpc: crate::lsp::jsonrpc_utils::JSONRPC_VERSION.to_string(),
method: "test".to_string(),
params: None,
};
handler(notification);
}
#[tokio::test]
async fn test_progress_create_notification() {
let monitor = IndexProgressMonitor::new();
let state = Arc::clone(&monitor.state);
let params = json!({
"token": "backgroundIndexProgress"
});
IndexProgressMonitor::handle_progress_create(Some(params), state.clone()).await;
let state = state.lock().await;
assert_eq!(
state.progress_token,
Some("backgroundIndexProgress".to_string())
);
}
#[tokio::test]
async fn test_progress_update_begin() {
let monitor = IndexProgressMonitor::new();
let state = Arc::clone(&monitor.state);
let params = json!({
"token": "backgroundIndexProgress",
"value": {
"kind": "begin",
"percentage": 0,
"title": "indexing"
}
});
IndexProgressMonitor::handle_progress_update(Some(params), state.clone(), None).await;
let state = state.lock().await;
if let IndexingStatus::InProgress {
percentage,
message,
..
} = &state.status
{
assert_eq!(*percentage, 0);
assert_eq!(*message, Some("indexing".to_string()));
} else {
panic!("Expected InProgress status");
}
}
#[tokio::test]
async fn test_progress_update_end() {
let monitor = IndexProgressMonitor::new();
let state = Arc::clone(&monitor.state);
let params = json!({
"token": "backgroundIndexProgress",
"value": {
"kind": "end"
}
});
IndexProgressMonitor::handle_progress_update(Some(params), state.clone(), None).await;
let state = state.lock().await;
assert_eq!(state.status, IndexingStatus::Completed);
}
}