pub trait DapLeader<'srv, 'req, S>: DapAuthorizedSender<S> + DapAggregator<'srv, 'req, S>where
'srv: 'req,{
type ReportSelector;
fn put_report<'life0, 'life1, 'async_trait>(
&'life0 self,
report: &'life1 Report
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn get_reports<'life0, 'life1, 'async_trait>(
&'life0 self,
selector: &'life1 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<HashMap<Id, HashMap<PartialBatchSelector, Vec<Report>>>, DapError>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn init_collect_job<'life0, 'life1, 'async_trait>(
&'life0 self,
collect_req: &'life1 CollectReq
) -> Pin<Box<dyn Future<Output = Result<Url, DapError>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn poll_collect_job<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id
) -> Pin<Box<dyn Future<Output = Result<DapCollectJob, DapError>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait;
fn get_pending_collect_jobs<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, CollectReq)>, DapError>> + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait;
fn finish_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id,
collect_resp: &'life3 CollectResp
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait;
fn send_http_post<'life0, 'async_trait>(
&'life0 self,
req: DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<DapResponse, DapError>> + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait;
fn http_post_upload<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<(), DapAbort>> + 'async_trait>>
where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
{ ... }
fn http_post_collect<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<Url, DapAbort>> + 'async_trait>>
where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
{ ... }
fn run_agg_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
part_batch_sel: &'life3 PartialBatchSelector,
reports: Vec<Report>
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
{ ... }
fn run_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
collect_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
collect_req: &'life3 CollectReq
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
{ ... }
fn process<'life0, 'async_trait>(
&'srv self,
selector: &'life0 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<DapLeaderProcessTelemetry, DapAbort>> + 'async_trait>>
where
'srv: 'async_trait,
'life0: 'async_trait,
Self: 'async_trait,
{ ... }
}
Expand description
DAP Leader functionality.
Required Associated Types
sourcetype ReportSelector
type ReportSelector
Data type used to guide selection of a set of reports for aggregation.
Required Methods
sourcefn put_report<'life0, 'life1, 'async_trait>(
&'life0 self,
report: &'life1 Report
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn put_report<'life0, 'life1, 'async_trait>(
&'life0 self,
report: &'life1 Report
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Store a report for use later on.
sourcefn get_reports<'life0, 'life1, 'async_trait>(
&'life0 self,
selector: &'life1 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<HashMap<Id, HashMap<PartialBatchSelector, Vec<Report>>>, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn get_reports<'life0, 'life1, 'async_trait>(
&'life0 self,
selector: &'life1 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<HashMap<Id, HashMap<PartialBatchSelector, Vec<Report>>>, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Fetch a sequence of reports to aggregate, grouped by task ID, then by partial batch selector. The reports returned are removed from persistent storage.
sourcefn init_collect_job<'life0, 'life1, 'async_trait>(
&'life0 self,
collect_req: &'life1 CollectReq
) -> Pin<Box<dyn Future<Output = Result<Url, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn init_collect_job<'life0, 'life1, 'async_trait>(
&'life0 self,
collect_req: &'life1 CollectReq
) -> Pin<Box<dyn Future<Output = Result<Url, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Create a collect job.
sourcefn poll_collect_job<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id
) -> Pin<Box<dyn Future<Output = Result<DapCollectJob, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
fn poll_collect_job<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id
) -> Pin<Box<dyn Future<Output = Result<DapCollectJob, DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
Check the status of a collect job.
sourcefn get_pending_collect_jobs<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, CollectReq)>, DapError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn get_pending_collect_jobs<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, CollectReq)>, DapError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Fetch the current collect job queue. The result is the sequence of collect ID and request pairs, in order of priority.
sourcefn finish_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id,
collect_resp: &'life3 CollectResp
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
fn finish_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
collect_id: &'life2 Id,
collect_resp: &'life3 CollectResp
) -> Pin<Box<dyn Future<Output = Result<(), DapError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
Complete a collect job by assigning it the completed CollectResp
.
sourcefn send_http_post<'life0, 'async_trait>(
&'life0 self,
req: DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<DapResponse, DapError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn send_http_post<'life0, 'async_trait>(
&'life0 self,
req: DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<DapResponse, DapError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Send an HTTP POST request.
Provided Methods
sourcefn http_post_upload<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<(), DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
fn http_post_upload<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<(), DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
Handle HTTP POST to /upload
. The input is the encoded report sent in the body of the HTTP
request.
sourcefn http_post_collect<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<Url, DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
fn http_post_collect<'async_trait>(
&'srv self,
req: &'req DapRequest<S>
) -> Pin<Box<dyn Future<Output = Result<Url, DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'req: 'async_trait,
Self: 'async_trait,
Handle HTTP POST to /collect
. The input is a CollectReq
.
The return value is a URI that the Collector can poll later on to get the corresponding
CollectResp
.
sourcefn run_agg_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
part_batch_sel: &'life3 PartialBatchSelector,
reports: Vec<Report>
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
fn run_agg_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
task_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
part_batch_sel: &'life3 PartialBatchSelector,
reports: Vec<Report>
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
Run the aggregation sub-protocol for the given set of reports. Return the number of reports that were aggregated successfully.
sourcefn run_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
collect_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
collect_req: &'life3 CollectReq
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
fn run_collect_job<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
collect_id: &'life1 Id,
task_config: &'life2 DapTaskConfig,
collect_req: &'life3 CollectReq
) -> Pin<Box<dyn Future<Output = Result<u64, DapAbort>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
Handle a pending collect request. If the results are reasdy, then cokmpute the aggregate results and store them to be retrieved by the Collector later. Returns the number of reports in the batch.
sourcefn process<'life0, 'async_trait>(
&'srv self,
selector: &'life0 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<DapLeaderProcessTelemetry, DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'life0: 'async_trait,
Self: 'async_trait,
fn process<'life0, 'async_trait>(
&'srv self,
selector: &'life0 Self::ReportSelector
) -> Pin<Box<dyn Future<Output = Result<DapLeaderProcessTelemetry, DapAbort>> + 'async_trait>>where
'srv: 'async_trait,
'life0: 'async_trait,
Self: 'async_trait,
Fetch a set of reports grouped by task, then run an aggregation job for each task. once all jobs completed, process the collect job queue. It is not safe to run multiple instances of this function in parallel.
This method is geared primarily towards testing. It also demonstrates how to properly synchronize collect and aggregation jobs. If used in a large DAP deployment, it is likely create a bottleneck. Such deployments can improve throughput by running many aggregation jobs in parallel.