v_queue 0.3.1

simple file based queue
Documentation
---
name: v-queue-server-fixes
overview: "Доработка HTTP-сервера v-queue-server: семантика acknowledgement в API, валидация входов, лимиты, удаление глобальных блокировок и устранение panic-источников. Зависит от плана queue-reliability-fixes только в части предположений о поведении ядра."
todos: []
isProject: false
---


# План доработки v-queue-server

Этот план покрывает только серверный crate в [v-queue-server/](v-queue-server/). Правки ядра очереди вынесены в отдельный план `queue-reliability-fixes`.

## Принципы

- Не меняем on-wire JSON формат API без необходимости. Новые параметры — опциональные с разумным default.
- Каждая фаза самостоятельно компилируется и проходит тесты в [v-queue-server/tests/]v-queue-server/tests/.
- Серверные правки могут начинаться параллельно с фазой 1 ядра, но `consume_messages` должен дождаться фикса `pop_body` (см. ниже).

---

## Фаза 1. Критические проблемы API

### 1.1 Acknowledgement в `consume_messages`

Файл: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), функция `consume_messages` (строки 154–279)

Проблема: `pop_body` уже двигает in-memory `pos_record`, но `commit()` не вызывается. При network error/disconnect клиент сообщения не получает, а сервер считает их отданными.

Выбрать стратегию (требует подтверждения, см. конец плана):

- **A. Auto-commit перед ответом.** В конце успешного формирования `messages` вызвать `consumer_obj.commit()`. Семантика "at-most-once": клиент может потерять сообщения при сетевом сбое, но не получит дубликаты.
- **B. Параметр `?auto_commit=true|false`** (default `false`). При `false` хранить "временный" сдвиг отдельно от закоммиченного `pos_record`. Клиент обязан вызвать `POST /commit` для подтверждения. Требует дополнительного поля в Consumer (например, `tentative_pos: u64`) или хранения last-batch в HashMap на стороне сервера.

Для варианта B сервер дополнительно должен:
- хранить snapshot `(pos_record, count_popped)` до выдачи batch'а;
- при следующем запросе без `commit` — откатывать к snapshot перед чтением.

### 1.2 Валидация имен queue/consumer

Файлы: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), все handler'ы с `Path`

- Добавить `fn validate_name(s: &str) -> Result<(), ServerError>`: только `[A-Za-z0-9_-]{1,128}`, отклонение `..`, `/`, `;`, `\n`.
- Применять во всех handler'ах перед вызовом `QueueManager`.
- Защищает от path traversal (`../etc/passwd`) при склейке пути в [src/queue.rs]src/queue.rs:
  ```209:209:src/queue.rs
  let ipp = self.base_path.to_owned() + "/" + &self.name + "-" + &part_id.to_string() + "/" + &self.name + "_info_push";
  ```
- Защищает от format-injection в `_info_*` файлах (разделитель `;`).

### 1.3 Лимит `max_message_size`

Файлы: [v-queue-server/src/config.rs](v-queue-server/src/config.rs), [v-queue-server/src/api.rs](v-queue-server/src/api.rs)

- Прокинуть `max_message_size` из `ServerConfig` через `AppState` (новое поле `max_message_size: usize`).
- В `consume_messages` перед `vec![0u8; msg_length]` (строки 182–184) проверять `msg_length <= max_message_size`. При превышении:
  - Залогировать `error!`.
  - Пропустить запись (вызвать `seek_next_pos` или сдвинуть `pos_record` на `HEADER_SIZE + msg_length` без чтения тела).
  - Не аллоцировать `Vec`, иначе мусорный `msg_length` ~2 ГБ положит сервер.

---

## Фаза 2. Конкурентность и устойчивость

### 2.1 Per-consumer mutex вместо глобального

Файлы: [v-queue-server/src/queue_manager.rs](v-queue-server/src/queue_manager.rs), [v-queue-server/src/main.rs](v-queue-server/src/main.rs), [v-queue-server/src/api.rs](v-queue-server/src/api.rs)

Сейчас всё под одним `Arc<Mutex<QueueManager>>`. Долгий long-poll `consume_messages` блокирует все остальные операции, включая `/health`.

- Заменить структуру:
  ```rust
  pub struct QueueManager {
      base_path: String,
      queues: RwLock<HashMap<String, Arc<Queue>>>,           // только метаданные
      consumers: RwLock<HashMap<String, Arc<Mutex<Consumer>>>>,
      auth_config: AuthConfig,
  }
  ```
- `get_consumer` возвращает `Arc<Mutex<Consumer>>`. API-handler берет короткий read-lock на `consumers`, клонирует `Arc`, отпускает; затем работает с `Consumer` под его собственным mutex.
- В `consume_messages` цикл long-poll держит mutex только на момент `pop_header`+`pop_body`, отпускает на время `tokio::time::sleep`.

### 2.2 Очистка `scan_existing_queues`

Файл: [v-queue-server/src/queue_manager.rs](v-queue-server/src/queue_manager.rs), функция `scan_existing_queues` (строки 35–61)

- После `Queue::new` проверять `queue.is_ready` и что `count_pushed > 0` хотя бы в одной части. Если info-файл пустой/мусорный — не вставлять в HashMap, логировать `warn!`.
- Игнорировать `.lock`-файлы и временные `.tmp` (если ядро добавит атомарную перезапись через rename).
- Не подхватывать файлы, имя которых после `strip_suffix("_info_queue")` не проходит `validate_name` из 1.2.

### 2.3 Убрать `unwrap()` на mutex

Файлы: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), [v-queue-server/src/auth.rs](v-queue-server/src/auth.rs), [v-queue-server/src/queue_manager.rs](v-queue-server/src/queue_manager.rs)

Сейчас по всему коду `state.queue_manager.lock().unwrap()`. После паники в одном handler'е mutex становится poisoned, и все следующие запросы паникуют.

- Заменить на:
  ```rust
  let manager = state.queue_manager.lock()
      .map_err(|_| ServerError::InternalError("queue manager mutex poisoned".into()))?;
  ```
- Аналогично в `auth_middleware` (строка 35) — там сейчас панический unwrap при poison'е.
- После 2.1 многие unwrap'ы исчезнут естественным образом (RwLock читается лоск-фри в hot path).

### 2.4 Убрать `unwrap()` после `get_mut`

Файл: [v-queue-server/src/queue_manager.rs](v-queue-server/src/queue_manager.rs), `get_consumer` (строка 96)

```96:96:v-queue-server/src/queue_manager.rs
Ok(self.consumers.get_mut(&key).unwrap())
```

После рефакторинга 2.1 этой строки не будет, но если 2.1 откладывается — заменить на безопасный паттерн через `entry(...).or_insert_with(...)`.

---

## Фаза 3. Качество данных в API

### 3.1 Рекурсивный обход в `parse_individuals_from_binary_fields`

Файл: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), функция `parse_individuals_from_binary_fields` (строки 78–149)

- Сейчас обходится только верхний уровень `obj.iter_mut()`. Бинарные поля во вложенных объектах не парсятся.
- Сделать рекурсивный обход через `Value` (учитывая объекты и массивы), либо явно документировать, что только верхний уровень поддерживается.

### 3.2 Различать "не Individual" и "поврежденный UTF-8"

Файл: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), ветка `MsgType::String` (строки 189–203)

- Сейчас при неудаче `try_parse_individual_to_json` для `MsgType::String` используется `String::from_utf8_lossy`, что молча подменяет битые байты на `U+FFFD`.
- Добавить fallback на base64 (как для `MsgType::Object`) с заполнением `raw_bytes`, если строка не валидная UTF-8.

### 3.3 Освобождение mutex на время sleep в long-poll

Файл: [v-queue-server/src/api.rs](v-queue-server/src/api.rs), цикл в `consume_messages` (строки 170–276)

- Текущий код освобождает mutex перед `sleep`, что верно. После 2.1 это станет естественным благодаря per-consumer mutex.
- Дополнительно: ограничить `timeout_ms` сверху (например, 30 секунд), чтобы клиент не мог удерживать слот axum'а часами.

---

## Фаза 4. Тесты и наблюдаемость

### 4.1 Тесты

В дополнение к [v-queue-server/tests/integration_tests.rs](v-queue-server/tests/integration_tests.rs) и [v-queue-server/tests/auth_integration_tests.rs](v-queue-server/tests/auth_integration_tests.rs):

- `consume_messages` после ошибки `pop_body` не теряет курсор (зависит от выбранной стратегии в 1.1).
- Path traversal: `GET /api/v1/queues/..%2Fetc/...` должен отдать 400.
- Большой `msg_length` в файле очереди не приводит к OOM (искусственно подменить header в tmp-каталоге).
- Параллельные consume_messages для разных очередей не блокируют друг друга (после 2.1).
- Mutex poison из одного handler'а не валит остальные (после 2.3).

### 4.2 Метрики и логирование

- В `consume_messages` логировать `info!` с `queue`, `consumer`, `count`, `elapsed_ms` после ответа.
- Опционально: счетчики через `metrics` crate (количество отданных сообщений, ошибки CRC, превышения size limit). Не входит в обязательный объем; обсуждается отдельно.

---

## Развилки, требующие подтверждения

1. **Стратегия acknowledgement (фаза 1.1):** A — auto-commit перед ответом (at-most-once, проще), B — `?auto_commit` параметр с tentative позицией (требует доработки и Consumer, и сервера)?
2. **Делать ли push endpoint** в HTTP API? Сейчас писать в очередь можно только напрямую через `Queue::new(ReadWrite)` из соседнего процесса. Если push нужен — отдельная задача (потребует валидации payload и применения `max_message_size`).
3. **Заменять Mutex на RwLock + per-consumer Mutex (фаза 2.1)** в этом PR или отдельным шагом? Это самая объемная правка и может конфликтовать с другими feature-ветками.
4. **Включать ли метрики (фаза 4.2)** в этот PR?