Crate daphne_worker

source ·
Expand description

Daphne-Worker implements a Workers backend for Daphne.

This software is intended to support experimental DAP deployments and is not yet suitable for use in production.

Using Daphne-Worker

The code is intended to be packaged with workers-rs. See DaphneWorkerRouter for usage instructions.

Architecture

Daphne-Worker uses Durable Objects (DOs) for transactional storage.

Report Storage (Leader-only)

The ReportsPending DO is used by the Leader to temporarily store reports uploaded by Clients while they wait to be aggregated. In order to allow the Leader to horizontally scale with the upload rate, reports are mapped to one of a number of DO instances running on the Workers platform simultaneously. Reports are partitioned first by task, then by report storage epoch (see report_storage_epoch_duration in DapGlobalConfig), then into one of a number of shards. The naming scheme for ReportsPending instances is as follows:

   <version>/task/<task_id>/epoch/<epoch>/shard/<shard>

where <version> is the DAP version, <task_id> is a task ID, <epoch> is the report’s epoch (the report timestamp truncated by the report storage epoch duration), and <shard> is a an integer in range [0, DAP_REPORT_SHARD_COUNT). The shard is determined by applying a keyed has function to the report’s ID. (The key is DAP_REPORT_SHARD_KEY.)

Report Metadata Storage (Leader and Helper)

The ReportsProcessed DO is used by the Leader and Helper to keep track of the set of reports that have been aggregated for a given task. The naming scheme for ReportsProcessed is the same as ReportsPending:

   <version>/task/<task_id>/epoch/<epoch>/shard/<shard>

where <version> is the DAP version, <task_id> the task ID0, <epoch> the report’s epoch, and <shard> is the report’s shard.

Aggregate Storage (Leader and Helper)

The AggregateStore DO is used by the Leader and Helper to store aggregate shares that are ready to be collected. Aggregate shares are partitioned first by task, then by batch bucket. (See DapBatchBucket.) Each instance of this DO contains an aggregate share and report count. The naming convention for instances of the AggregateStore DO is as follows:

[Time-interval tasks] <version>/task/<task_id>/window/<window>
[Fixed-size tasks]    <version>/task/<task_id>/batch/<batch_id>

where is the DAP version, <task_id> is the task ID, <window> is a batch window, and <batch_id> is a batch ID. A batch window is a UNIX timestamp (in seconds) truncated by the time precision for the task. (See the time_precision paramaeter of DapTaskConfig.)

Aggregation Jobs (Leader-only)

NOTE: This scheme is not expected to scale well. Currently it is only suited for driving end-to-end tests.

The LeaderAggregationJobQueue DO is used by the Leader to queue aggregation jobs. There is just one instance of this DO; once a ReportsPending instance becomes non-empty, it sends a message to LeaderAggregationJobQueue indicating when the instance was created.

Aggregation jobs are driven by the Leader’s main processing loop (see DapLeader::process()). The report selector for Daphne-Worker, DaphneWorkerReportSelector, indicates the number of jobs to fetch at once (max_agg_jobs) and the number of reports to drain per job (max_reports).

Jobs are handled roughly in order of creation (oldest jobs are handled first). The time at which an aggregation job was created is used determine the order in which it was processed. Timestamps are truncated to the second; ties are broken by a nonce generated at creation time.

Collection Jobs (Leader-only)

NOTE: This scheme is not expected to scale well. Currently it is only suited for driving end-to-end tests.

The LeaderCollectionJobQueue DO is used by the Leader to queue collection jobs. There is just one instance of this DO; once the Leader gets a collect request from the Collector, it adds a job to the queue.

A “collection job ID” is computed for each job and incorporated into the collect URI for the Collector to poll later on. The job ID Is derived by applying a keyed hash to the serialized CollectReq message. (The key is DAP_COLLECT_ID_KEY.)

They are driven by the Leader’s main processing loop. After all aggregation jobs are complete, the entire queue is fetched from LeaderCollectionJobQueue. In the order in which they were created, the Leader checksto see if the job can be completed (i.e., the span of batch buckets contains a sufficient number of reports).

Batch Queue (Leader-only).

NOTE: This scheme is not expected to scale well. Currently it is only suited for driving end-to-end tests.

The LeaderBatchQueue DO is used for fixed-size tasks in order to assign reports to batches. The naming scheme for instances of this DO is as follows:

    <version>/task/<task_id>

where <version> is the DAP version and <task_id> is the task ID. Each instance maintains a queue of batch. When a set of reports is drained from a ReportsPending instance, the the batch in the front of the queue is filled first; if the batch is saturated (i.e., the target batch size is met) then the batch is removed from the queue and the process is repeated.

Storage of the Helper’s State (Helper-only)

The HelperStateStore DO is used to store the Helper’s state (DapHelperState) during the aggregation sub-protocool. It is used to carry state across HTTP requests. The naming scheme for instances of the DO is as follows:

    <version>/task/<task_id>/agg_job/<agg_job_id>

where <version> is the DAP version, <task_id> is the task ID, and <agg_job_id> is the aggregation job ID.

Environment Variables

The runtime behavior of Daphne-Worker is controlled by the environment variables defined in the table below.

NOTE: There are additional, undocumented environment variables. These will be lifted to the documentation once we decide what we need for production.

NameTypeSecret?Description
DAP_AGGREGATOR_ROLEStringnoAggregator role, either “leader” or “helper”.
DAP_COLLECT_ID_KEYStringyesHex-encoded key used to derive the collection job ID from the collect request
DAP_GLOBAL_CONFIGDapGlobalConfignoDAP global config.
DAP_DEPLOYMENTStringnoDeployment type, only “prod” for now.
DAP_REPORT_SHARD_COUNTu64noNumber of report shards per storage epoch.
DAP_REPORT_SHARD_KEYStringyesHex-encoded key used to hash a report into one of the report shards.

Structs

Parameters used by the Leader to select a set of reports for aggregation.
HTTP request handler for Daphne-Worker.