wp-connector-api 0.10.1

Connector runtime traits, config helpers, and errors for WarpParse sinks and sources
Documentation
# `wp-connector-api` Developer Guide

This guide mirrors the current `wp-connector-api` implementation. It aggregates the configuration adapters, Sink/Source runtime traits, error model, and practical advice so you can build or debug connectors quickly.

## 1. Configuration-Time Interfaces

- **ParamMap**: unified parameter container (`BTreeMap<String, serde_json::Value>`). Use the helpers in `config::param` (`parammap_from_toml_table/map`) to flatten TOML tables and keep keys sorted for stable diffs.
- **ConnectorKindAdapter**: maps human-friendly inputs (such as `conn_url`) into a ParamMap. Implementors must provide:
  - `kind(&self) -> &'static str`: unique identifier.
  - `defaults(&self) -> ParamMap`: connector-specific defaults; returning `ParamMap::new()` means "no defaults".
  - `url_to_params(&self, url: &str) -> anyhow::Result<ParamMap>`: parse the URL; return an error when encountering unsupported formats.

### 1.1 Connector Definition API

**ConnectorScope**: an enum representing the connector's scope:
- `Source` (default): data source connector.
- `Sink`: data sink connector.

**ConnectorDef**: connector metadata struct with the following fields:
- `id: String`: unique connector identifier.
- `kind: String`: connector type (serialized as `type`).
- `scope: ConnectorScope`: runtime-only field, not serialized.
- `allow_override: Vec<String>`: list of parameter keys that can be overridden.
- `default_params: ParamMap`: default parameters (serialized as `params`).
- `origin: Option<String>`: origin identifier, runtime-only field, not serialized.

`ConnectorDef` provides a builder method:
- `with_scope(scope: ConnectorScope) -> Self`: set the scope and return self.

**SourceDefProvider** trait: interface for Source connector definition and validation.
- `source_def(&self) -> ConnectorDef`: returns the Source connector definition (required).
- `validate_source(&self, def: &ConnectorDef) -> Result<(), String>`: validate a Source definition; defaults to `Ok(())`.

**SinkDefProvider** trait: interface for Sink connector definition and validation.
- `sink_def(&self) -> ConnectorDef`: returns the Sink connector definition (required).
- `validate_sink(&self, def: &ConnectorDef) -> Result<(), String>`: validate a Sink definition; defaults to `Ok(())`.

> Connectors can implement one or both traits as needed. For example, a pure Source connector only needs to implement `SourceDefProvider`, while a pure Sink connector only needs `SinkDefProvider`.

## 2. Sink Runtime Interfaces

### 2.1 Core Traits

- `AsyncCtrl` – runtime control.
  - `stop(&mut self)`: idempotent shutdown; stop all tasks and release resources.
  - `reconnect(&mut self)`: rebuild connections or reset state without changing external semantics.
- `AsyncRecordSink` – structured records.
  - `sink_record(&mut self, &DataRecord)`: single record.
  - `sink_records(&mut self, Vec<Arc<DataRecord>>)`: batch write while preserving order.
- `AsyncRawDataSink` – raw text/bytes.
  - `sink_str` / `sink_bytes`: single payload.
  - `sink_str_batch` / `sink_bytes_batch`: batch payloads.
- `AsyncSink` – blanket impl combining `AsyncCtrl + AsyncRecordSink + AsyncRawDataSink + Send + Sync`. Implement `impl AsyncSink for MySink {}` so the orchestrator can treat your sink uniformly.

### 2.2 Build Pipeline

- `SinkBuildCtx`
  - `work_root: PathBuf`: sandbox directory per instance.
  - `replica_idx/replica_cnt`: zero-based replica index and total replicas (>= 1).
  - `rate_limit_rps`: upstream hint for rate limiting.
- `ResolvedSinkSpec`
  - `group/name/kind/connector_id`: identifiers.
  - `params: ParamMap`: flattened runtime params.
  - `filter: Option<String>`: optional filter string; semantics depend on the caller.
- `SinkFactory`
  - `kind()`: registry name.
  - `validate_spec()`: optional lightweight validation (defaults to no-op).
  - `build(spec, ctx) -> SinkHandle`: construct `Box<dyn AsyncSink>`; propagate `anyhow::Error` on failure.

## 3. Source Runtime Interfaces

### 3.1 `DataSource` Behavior

- `receive(&mut self) -> SourceResult<SourceBatch>`: the only mandatory method. A batch may be empty to indicate “no data yet”.
- `try_receive(&mut self) -> Option<SourceBatch>`: only call when `supports_try_receive()` and `can_try_receive()` both return true; otherwise return `None`.
- `supports_try_receive(&self)`: static capability (default `false`).
- `can_try_receive(&mut self)`: dynamic capability (defaults to the static value).
- `identifier()` / `identifier_ref()` for logs/metrics. `identifier_ref()` falls back to `String`; override it to avoid allocations.
- Lifecycle hooks:
  - `start(&mut self, CtrlRx)`: prepare resources and start listening to control events (default no-op).
  - `close(&mut self)`: idempotent shutdown.
- Optional capabilities:
  - `caps(&self) -> SourceCaps`: advertise `ack` / `seek` / `parallel` support.
  - `ack(&mut self, Arc<dyn AckToken>)`: default `SupplierError("ack unsupported")`.
  - `seek(&mut self, Arc<dyn SeekPosition>)`: default `SupplierError("seek unsupported")`.

### 3.2 Events and Control

- `SourceEvent`
  - Fields: `event_id`, `src_key`, `payload: RawData`, `tags: Arc<Tags>`, `ups_ip`, `preproc`. `payload` accepts `String`, `Bytes`, or `Arc<Vec<u8>>`; debug output summarizes lengths.
- `ControlEvent`
  - `Stop`: request immediate stop.
  - `Isolate(bool)`: pause (`true`) or resume (`false`).
  - `Seek(Arc<dyn SeekPosition>)`: seek to a position.
- `CtrlRx = async_broadcast::Receiver<ControlEvent>`: listen inside `start()` for orchestrator commands.
- `Tags`: sorted `SmallVec` with `set/get/is_empty` helpers; unit tests guarantee deterministic order.

### 3.3 `SourceFactory` Pipeline

- `SourceBuildCtx { work_root }`: provides per-instance workspace similar to sinks.
- `SourceMeta { name, kind, tags }`: metadata for UI/monitoring.
- `SourceHandle { source, metadata }`: a pull-based instance.
- `AcceptorHandle { name, acceptor }`: server-side listener (HTTP, gRPC, ...).
- `SourceSvcIns { sources, acceptor }`: return value of `SourceFactory::build`, allowing multiple sources plus an optional acceptor.
- `ResolvedSourceSpec`: fields `name`, `kind`, `connector_id`, `params: ParamMap`, `tags: Vec<String>`. Factories must implement `kind()`, optional `validate_spec()`, and `build(spec, ctx)`.

## 4. Error Model

- **Sink**: `SinkReason` / `SinkError` wrap `orion_error::StructError`. Use `SinkReason::sink(ctx)` and the `SinkErrorOwe` helper (`some_call().owe_sink("context")?`) to annotate external failures.
  - Error code mapping:
    - `Sink(String)`: 500 — General sink unavailable
    - `Mock`: 599 — Mock/test error
    - `StgCtrl`: 510 — Storage control error
    - `Uvs(UvsReason)`: Delegates to inner UvsReason's error code
- **Source**: `SourceReason` / `SourceError` mirror the sink side.
  - Error code mapping:
    - `NotData`: 100 — Temporary no data available (normal)
    - `EOF`: 101 — End of data stream (normal)
    - `Disconnect(String)`: 503 — Connection lost (retryable)
    - `SupplierError(String)`: 500 — Upstream supplier error
    - `Other(String)`: 520 — Unclassified error
    - `Uvs(UvsReason)`: Delegates to inner UvsReason's error code
  - `SourceResult<T>` is an alias for `Result<T, StructError<SourceReason>>`.

## 5. Example: In-Memory Connector

Below is a complete in-memory connector example implementing both Source and Sink. Use it as a reference template when developing new connectors.

### 5.1 MemorySource Implementation

```rust
use std::sync::Arc;
use async_trait::async_trait;
use wp_connector_api::{
    DataSource, SourceBatch, SourceEvent, SourceResult, Tags,
};
use wp_parse_api::RawData;

struct MemorySource {
    name: String,
    events: Vec<String>,
    cursor: usize,
}

#[async_trait]
impl DataSource for MemorySource {
    async fn receive(&mut self) -> SourceResult<SourceBatch> {
        if self.cursor >= self.events.len() {
            return Ok(vec![]); // No more data
        }
        let event = SourceEvent::new(
            self.cursor as u64,
            Arc::new(self.name.clone()),
            RawData::from_string(&self.events[self.cursor]),
            Arc::new(Tags::default()),
        );
        self.cursor += 1;
        Ok(vec![event])
    }

    fn try_receive(&mut self) -> Option<SourceBatch> {
        if self.cursor < self.events.len() {
            let event = SourceEvent::new(
                self.cursor as u64,
                Arc::new(self.name.clone()),
                RawData::from_string(&self.events[self.cursor]),
                Arc::new(Tags::default()),
            );
            self.cursor += 1;
            Some(vec![event])
        } else {
            None
        }
    }

    fn supports_try_receive(&self) -> bool { true }

    fn identifier(&self) -> String {
        format!("memory-source:{}", self.name)
    }
}
```

### 5.2 MemorySink Implementation

```rust
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use wp_connector_api::{
    AsyncCtrl, AsyncRawDataSink, AsyncRecordSink, SinkResult,
};
use wp_model_core::model::DataRecord;

#[derive(Clone, Default)]
struct MemorySinkBuffer {
    data: Arc<Mutex<Vec<String>>>,
}

struct MemorySink {
    buffer: MemorySinkBuffer,
}

#[async_trait]
impl AsyncCtrl for MemorySink {
    async fn stop(&mut self) -> SinkResult<()> { Ok(()) }
    async fn reconnect(&mut self) -> SinkResult<()> { Ok(()) }
}

#[async_trait]
impl AsyncRecordSink for MemorySink {
    async fn sink_record(&mut self, _data: &DataRecord) -> SinkResult<()> {
        self.buffer.data.lock().unwrap().push("record".into());
        Ok(())
    }
    async fn sink_records(&mut self, data: Vec<Arc<DataRecord>>) -> SinkResult<()> {
        let mut buf = self.buffer.data.lock().unwrap();
        for _ in data { buf.push("record".into()); }
        Ok(())
    }
}

#[async_trait]
impl AsyncRawDataSink for MemorySink {
    async fn sink_str(&mut self, data: &str) -> SinkResult<()> {
        self.buffer.data.lock().unwrap().push(data.to_string());
        Ok(())
    }
    async fn sink_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
        self.buffer.data.lock().unwrap().push(String::from_utf8_lossy(data).into());
        Ok(())
    }
    async fn sink_str_batch(&mut self, data: Vec<&str>) -> SinkResult<()> {
        let mut buf = self.buffer.data.lock().unwrap();
        for s in data { buf.push(s.to_string()); }
        Ok(())
    }
    async fn sink_bytes_batch(&mut self, data: Vec<&[u8]>) -> SinkResult<()> {
        let mut buf = self.buffer.data.lock().unwrap();
        for b in data { buf.push(String::from_utf8_lossy(b).into()); }
        Ok(())
    }
}
```

### 5.3 ConnectorFactory Implementation

```rust
use async_trait::async_trait;
use wp_connector_api::{
    ConnectorDef, ConnectorScope, SinkDefProvider, SourceDefProvider,
    SinkFactory, SinkHandle, SinkBuildCtx, SinkSpec, SinkResult,
    SourceFactory, SourceHandle, SourceMeta, SourceBuildCtx,
    SourceSpec, SourceResult, SourceSvcIns,
};

struct DemoConnectorFactory {
    sink_buffer: MemorySinkBuffer,
    source_events: Vec<String>,
}

// Implement SourceDefProvider as needed
impl SourceDefProvider for DemoConnectorFactory {
    fn source_def(&self) -> ConnectorDef {
        ConnectorDef {
            id: "demo-source".into(),
            kind: "memory".into(),
            scope: ConnectorScope::Source,
            allow_override: vec!["events".into()],
            default_params: Default::default(),
            origin: Some("demo".into()),
        }
    }
}

// Implement SinkDefProvider as needed
impl SinkDefProvider for DemoConnectorFactory {
    fn sink_def(&self) -> ConnectorDef {
        ConnectorDef {
            id: "demo-sink".into(),
            kind: "memory".into(),
            scope: ConnectorScope::Sink,
            ..Default::default()
        }
    }
}

#[async_trait]
impl SourceFactory for DemoConnectorFactory {
    fn kind(&self) -> &'static str { "memory" }

    async fn build(&self, spec: &SourceSpec, _ctx: &SourceBuildCtx) -> SourceResult<SourceSvcIns> {
        let source = MemorySource {
            name: spec.name.clone(),
            events: self.source_events.clone(),
            cursor: 0,
        };
        let handle = SourceHandle::new(
            Box::new(source),
            SourceMeta::new(&spec.name, "memory"),
        );
        Ok(SourceSvcIns::new().with_sources(vec![handle]))
    }
}

#[async_trait]
impl SinkFactory for DemoConnectorFactory {
    fn kind(&self) -> &'static str { "memory" }

    async fn build(&self, _spec: &SinkSpec, _ctx: &SinkBuildCtx) -> SinkResult<SinkHandle> {
        Ok(SinkHandle::new(Box::new(MemorySink {
            buffer: self.sink_buffer.clone(),
        })))
    }
}
```

See `wp-connector-api/tests/demo_connector.rs` for the complete working example.

## 6. Practical Tips

1. **Validate early**: run parameter checks in `ConnectorKindAdapter::url_to_params` and `SinkFactory::validate_spec` to catch issues before runtime.
2. **Enforce idempotency**: `stop`, `receive`, `ack`, etc. must be safe to retry. Never `unwrap/expect`; propagate errors via `SinkResult`/`SourceResult`.
3. **Prioritize zero-copy**: `SourceEvent.payload` can be `RawData::ArcBytes`. Provide `Arc<Vec<u8>>` so `into_bytes()` can attempt `Arc::try_unwrap` and reuse buffers when the refcount is 1.
4. **Test ParamMap conversions**: `config::param` already has unit tests; extend them when adding more complex structures.
5. **Keep docs in sync**: whenever you add APIs or change semantics, update both the Chinese (docs/zh) and English (docs/en) versions.

Questions? Mention the WarpParse Dev Team in your PR or issue.