em_filter
Rust SDK for building Emergence network agents.
em_filter lets any Rust process join the Emergence distributed discovery network
as a filter agent — a service that receives search queries from the em_disco
broker, processes them (web search, DNS lookup, LLM call, database query, …), and
returns structured results.
This crate is the Rust equivalent of the Erlang em_filter library: same WebSocket
protocol, same configuration contract, idiomatic Rust API.
How it works
┌─────────────┐ WebSocket ┌───────────────┐ WebSocket ┌─────────────┐
│ em_disco │ ◄─────────────── │ FilterRunner │ ─────────────── │ em_disco │
│ (broker) │ query / result │ (your agent) │ (multi-node) │ (replica) │
└─────────────┘ └───────────────┘ └─────────────┘
│
Arc<Mutex<F>>
│
┌──────┴──────┐
│ your Filter │
│ impl │
└─────────────┘
FilterRunnerresolves disco nodes and spawns one tokio task per node.- Each task maintains a persistent WebSocket connection with automatic reconnection.
- On a
queryframe, the task calls yourFilter::handleand sends back aresultframe.
Installation
[]
= "0.1"
= "1"
= { = "1", = ["full"] }
async_trait is re-exported by the crate — no need to add it separately.
Quick start
use ;
use ;
;
async
By default the agent connects to localhost:8080. Override via environment
variables or AgentConfig — see Configuration.
Try the built-in example
The crate ships an echo_filter example — the fastest way to verify that your
em_disco broker is reachable and the handshake works:
# Clone / enter the crate directory, then:
Expected output once connected:
INFO em_filter: Starting em_filter agent agent="echo_filter" nodes=1
INFO em_filter: Connecting to em_disco agent="echo_filter" url="ws://localhost:8080/ws"
INFO em_filter: Registered on em_disco — entering message loop agent="echo_filter"
With a custom broker:
EM_DISCO_HOST=disco.example.com \
EM_DISCO_PORT=443 \
EM_FILTER_JWT_TOKEN=eyJ... \
Test it from the Erlang shell (with em_disco running):
.
%% → [#{<<"type">> => <<"url">>,
%% <<"properties">> => #{<<"title">> => <<"Echo: hello world">>, ...}}]
Building your own filter
Copy the echo example as your starting point:
filters/
└── my_filter/
├── Cargo.toml
└── src/
└── main.rs
Cargo.toml:
[]
= "my_filter"
= "0.1.0"
= "2021"
[]
= "0.1"
= "1"
= { = "1", = ["full"] }
= "0.3"
src/main.rs:
use ;
use ;
;
async
The Filter trait
Implement Filter on any struct that holds your agent's state:
use ;
use ;
Result format
handle returns a serde_json::Value — typically a JSON array of embryo objects.
Each embryo has a "type" string and a "properties" map:
| Type | Required properties |
|---|---|
"url" |
url, title |
"dns" |
domain, ips |
"text" |
content |
An empty array (json!([])) or Value::Null means "no results for this query".
Capabilities
The capabilities() method returns the list of capabilities your agent advertises.
em_disco uses this to route queries — a query with capabilities = ["dns"] is
delivered only to agents that advertise "dns".
Default: ["search", "query"]. Override to add domain-specific capabilities.
Configuration
Environment variables
| Variable | Default | Description |
|---|---|---|
EM_DISCO_HOST |
— | Disco broker hostname |
EM_DISCO_PORT |
— | Disco broker port |
EM_FILTER_JWT_TOKEN |
— | JWT for authenticated brokers |
EM_FILTER_RECONNECT_MS |
5000 |
Reconnect delay in milliseconds |
Node resolution order
AgentConfig::disco_nodes— explicit list (highest priority)EM_DISCO_HOST/EM_DISCO_PORTenv vars[em_disco] nodes = …inemergence.conflocalhost:8080— built-in default
TLS inference
| Host | Port | Transport |
|---|---|---|
localhost, 127.0.0.1, ::1 |
any | ws:// (plain) |
| any other | 443 | wss:// (TLS) |
| any other | other | ws:// (plain) |
emergence.conf
[em_disco]
nodes = localhost:8080, disco.example.com, [::1]:9000
Platform paths:
- Linux / macOS:
~/.config/emergence/emergence.conf - Windows:
%APPDATA%\emergence\emergence.conf
Programmatic configuration
use ;
let config = AgentConfig ;
Multi-node
FilterRunner connects to all resolved nodes simultaneously. Each node gets its
own tokio task; the filter is shared via Arc<Mutex<F>>. All handler calls are
serialized — one query at a time — regardless of how many nodes are connected.
let config = AgentConfig ;
new
.run
.await
.unwrap;
HTML utilities
A set of helpers for processing web pages, useful in web-scraper filters:
use ;
let html = r#"<p>Hello <b>world</b></p><script>alert(1)</script>"#;
// Remove <script> blocks
let clean = strip_scripts.unwrap;
// Extract plain text
let text = get_text; // "Hello world"
// CSS selector extraction
let links = extract_elements;
// Attribute extraction
let href = extract_attribute;
// → Some("/page")
// Entity decoding
let decoded = decode_html_entities;
// → "café & croissant"
// Skip ad / tracker links
let skip = should_skip_link;
// → true
WebSocket protocol
The agent speaks a minimal JSON-over-WebSocket protocol to em_disco.
Agent → Disco:
Disco → Agent:
The library handles the handshake and reconnection automatically. Your code only
implements Filter::handle.
Logging
The library uses tracing. Add tracing-subscriber
to your binary crate to see connection logs:
[]
= "0.3"
init;
Log levels:
INFO— connection lifecycle (connected, disconnected, registered)WARN— connection errors, malformed frames, query ID issuesERROR— task panics