Skip to main content

Crate em_filter

Crate em_filter 

Source
Expand description

§em_filter — Rust SDK for the Emergence network

em_filter lets any Rust process join the Emergence distributed discovery network as a filter agent. A filter agent receives search queries from the em_disco broker, processes them (web search, DNS lookup, LLM call, database query, …), and returns structured results.

This crate is the Rust equivalent of the Erlang em_filter library — same WebSocket protocol, same configuration contract, idiomatic Rust API.


§How it works

 ┌─────────────┐    WebSocket     ┌───────────────┐    WebSocket     ┌──────────────┐
 │  em_disco   │ ◄─────────────── │  FilterRunner  │ ─────────────── │  em_disco    │
 │  (broker)   │  query / result  │  (your agent)  │  (multi-node)   │  (replica)   │
 └─────────────┘                  └───────────────┘                  └──────────────┘
                                         │
                                  Arc<Mutex<F>>
                                         │
                                  ┌──────┴──────┐
                                  │ your Filter  │
                                  │    impl      │
                                  └─────────────┘
  1. FilterRunner resolves disco nodes from config / env / defaults.
  2. It spawns one tokio task per node; each task maintains a persistent WebSocket connection with automatic reconnection.
  3. When em_disco sends a query frame, the task acquires the shared Arc<Mutex<F>>, calls your Filter::handle implementation, and sends back a result frame.

§Quick start

Add to Cargo.toml:

[dependencies]
em_filter  = "0.1"
serde_json = "1"
tokio      = { version = "1", features = ["full"] }

Implement the Filter trait and run it:

use em_filter::{async_trait, AgentConfig, EmFilterError, Filter, FilterRunner};
use serde_json::{json, Value};

struct MyFilter;

#[async_trait]
impl Filter for MyFilter {
    async fn handle(&mut self, body: &str) -> Result<Value, EmFilterError> {
        // `body` is the raw query string, e.g. "erlang otp"
        Ok(json!([{
            "type": "url",
            "properties": {
                "url":   "https://example.com",
                "title": format!("Result for: {}", body)
            }
        }]))
    }

    fn capabilities(&self) -> Vec<String> {
        vec!["search".into(), "query".into(), "web".into()]
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();
    FilterRunner::new("my_filter", MyFilter, AgentConfig::default())
        .run()
        .await
        .unwrap();
}

By default the agent connects to localhost:8080. See AgentConfig for environment variables and emergence.conf configuration.

A runnable echo_filter example is included in the crate. It connects to em_disco and echoes every query back — useful for verifying your broker setup:

cargo run --example echo_filter

§Configuration

Node discovery follows this priority order (same as the Erlang library):

PrioritySource
1AgentConfig::disco_nodes (explicit)
2EM_DISCO_HOST / EM_DISCO_PORT env vars
3[em_disco] nodes = … in emergence.conf
4localhost:8080 (built-in default)

Environment variables:

VariableDefaultDescription
EM_DISCO_HOSTDisco broker hostname
EM_DISCO_PORTDisco broker port
EM_FILTER_JWT_TOKENJWT for authenticated brokers
EM_FILTER_RECONNECT_MS5000Reconnect delay in milliseconds

TLS is inferred automatically:

  • localhost / 127.0.0.1 / ::1 → plain WebSocket (ws://)
  • Remote host on port 443 → TLS WebSocket (wss://)
  • Remote host on any other port → plain WebSocket (ws://)

§HTML utilities

The crate ships a small set of HTML helpers useful when scraping web pages. They mirror the Erlang em_filter module’s function signatures:

use em_filter::{strip_scripts, get_text, extract_elements, extract_attribute,
                decode_html_entities, should_skip_link};

let html = r#"<p>Hello <b>world</b></p><script>alert(1)</script>"#;

let clean   = strip_scripts(html).unwrap();           // removes <script> blocks
let text    = get_text(&clean);                        // "Hello world"
let links   = extract_elements(html, "a");             // Vec<String> of inner HTML
let href    = extract_attribute(r#"<a href="/x">"#, "href"); // Some("/x")
let decoded = decode_html_entities("caf&eacute;");     // "café"
let skip    = should_skip_link("https://ads.com", &["ads.com"]); // true

§WebSocket protocol

The agent speaks a simple JSON-over-WebSocket protocol to em_disco:

Agent → Disco:

{ "action": "register",    "name": "<agent_name>" }
{ "action": "agent_hello", "capabilities": ["search", "query", "web"] }
{ "action": "result",      "id": "<query_id>", "data": <result> }

Disco → Agent:

{ "status": "ok", "action": "registered" }
{ "status": "ok", "action": "agent_registered", "capabilities": [...] }
{ "action": "query", "id": "<query_id>", "body": "<query_string>" }

Structs§

AgentConfig
Configuration for a filter agent.
DiscoNode
A single em_disco node to connect to.
FilterRunner
Runs a filter agent by connecting to all configured em_disco nodes.

Enums§

EmFilterError
All errors produced by the em_filter library.

Traits§

Filter
The handler contract for an Emergence filter agent.

Functions§

decode_html_entities
Decodes &#N;, &#xHH;, and &name; HTML entities in a string.
extract_attribute
Extracts the value of an HTML attribute from the first element in a fragment.
extract_elements
Extracts elements matching a CSS selector, returning the inner HTML of each match.
get_text
Strips all HTML tags from a string, returning plain text.
should_skip_link
Returns true if a link should be skipped by a web scraper.
strip_scripts
Removes all <script>…</script> blocks from an HTML string.

Attribute Macros§

async_trait