Trait daphne::roles::DapLeader

source ·
pub trait DapLeader<'srv, 'req, S>: DapAuthorizedSender<S> + DapAggregator<'srv, 'req, S>where
    'srv: 'req,
{ type ReportSelector; fn put_report<'life0, 'life1, 'async_trait>(
        &'life0 self,
        report: &'life1 Report
    ) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        Self: 'async_trait
; fn get_reports<'life0, 'life1, 'async_trait>(
        &'life0 self,
        selector: &'life1 Self::ReportSelector
    ) -> Pin<Box<dyn Future<Output = Result<HashMap<Id, HashMap<PartialBatchSelector, Vec<Report>>>, DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        Self: 'async_trait
; fn init_collect_job<'life0, 'life1, 'async_trait>(
        &'life0 self,
        collect_req: &'life1 CollectReq
    ) -> Pin<Box<dyn Future<Output = Result<Url, DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        Self: 'async_trait
; fn poll_collect_job<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        task_id: &'life1 Id,
        collect_id: &'life2 Id
    ) -> Pin<Box<dyn Future<Output = Result<DapCollectJob, DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        'life2: 'async_trait,
        Self: 'async_trait
; fn get_pending_collect_jobs<'life0, 'async_trait>(
        &'life0 self
    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, CollectReq)>, DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn finish_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
        &'life0 self,
        task_id: &'life1 Id,
        collect_id: &'life2 Id,
        collect_resp: &'life3 CollectResp
    ) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        'life2: 'async_trait,
        'life3: 'async_trait,
        Self: 'async_trait
; fn send_http_post<'life0, 'async_trait>(
        &'life0 self,
        req: DapRequest<S>
    ) -> Pin<Box<dyn Future<Output = Result<DapResponse, DapError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn http_post_upload<'async_trait>(
        &'srv self,
        req: &'req DapRequest<S>
    ) -> Pin<Box<dyn Future<Output = Result<(), DapAbort>> + 'async_trait>>
    where
        'srv: 'async_trait,
        'req: 'async_trait,
        Self: 'async_trait
, { ... } fn http_post_collect<'async_trait>(
        &'srv self,
        req: &'req DapRequest<S>
    ) -> Pin<Box<dyn Future<Output = Result<Url, DapAbort>> + 'async_trait>>
    where
        'srv: 'async_trait,
        'req: 'async_trait,
        Self: 'async_trait
, { ... } fn run_agg_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
        &'life0 self,
        task_id: &'life1 Id,
        task_config: &'life2 DapTaskConfig,
        part_batch_sel: &'life3 PartialBatchSelector,
        reports: Vec<Report>
    ) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        'life2: 'async_trait,
        'life3: 'async_trait,
        Self: 'async_trait
, { ... } fn run_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
        &'life0 self,
        collect_id: &'life1 Id,
        task_config: &'life2 DapTaskConfig,
        collect_req: &'life3 CollectReq
    ) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        'life2: 'async_trait,
        'life3: 'async_trait,
        Self: 'async_trait
, { ... } fn process<'life0, 'async_trait>(
        &'srv self,
        selector: &'life0 Self::ReportSelector
    ) -> Pin<Box<dyn Future<Output = Result<DapLeaderProcessTelemetry, DapAbort>> + 'async_trait>>
    where
        'srv: 'async_trait,
        'life0: 'async_trait,
        Self: 'async_trait
, { ... } }
Expand description

DAP Leader functionality.

Required Associated Types

Data type used to guide selection of a set of reports for aggregation.

Required Methods

Store a report for use later on.

Fetch a sequence of reports to aggregate, grouped by task ID, then by partial batch selector. The reports returned are removed from persistent storage.

Create a collect job.

Check the status of a collect job.

Fetch the current collect job queue. The result is the sequence of collect ID and request pairs, in order of priority.

Complete a collect job by assigning it the completed CollectResp.

source

fn send_http_post<'life0, 'async_trait>(
    &'life0 self,
    req: DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<DapResponse, DapError>> + 'async_trait>>where
    'life0: 'async_trait,
    Self: 'async_trait,

Send an HTTP POST request.

Provided Methods

source

fn http_post_upload<'async_trait>(
    &'srv self,
    req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<(), DapAbort>> + 'async_trait>>where
    'srv: 'async_trait,
    'req: 'async_trait,
    Self: 'async_trait,

Handle HTTP POST to /upload. The input is the encoded report sent in the body of the HTTP request.

source

fn http_post_collect<'async_trait>(
    &'srv self,
    req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<Url, DapAbort>> + 'async_trait>>where
    'srv: 'async_trait,
    'req: 'async_trait,
    Self: 'async_trait,

Handle HTTP POST to /collect. The input is a CollectReq. The return value is a URI that the Collector can poll later on to get the corresponding CollectResp.

Run the aggregation sub-protocol for the given set of reports. Return the number of reports that were aggregated successfully.

Handle a pending collect request. If the results are reasdy, then cokmpute the aggregate results and store them to be retrieved by the Collector later. Returns the number of reports in the batch.

Fetch a set of reports grouped by task, then run an aggregation job for each task. once all jobs completed, process the collect job queue. It is not safe to run multiple instances of this function in parallel.

This method is geared primarily towards testing. It also demonstrates how to properly synchronize collect and aggregation jobs. If used in a large DAP deployment, it is likely create a bottleneck. Such deployments can improve throughput by running many aggregation jobs in parallel.

Implementors