pie 0.2.0

Pie: A Programmable LLM Serving System
Documentation
use crate::api::core::{Blob, BlobResult};
use crate::api::inferlet;
use crate::instance::InstanceState;
use crate::messaging::{PubSubCommand, PushPullCommand, dispatch_i2i, dispatch_u2i};
use crate::server;
use async_trait::async_trait;
use std::mem;
use tokio::sync::{mpsc, oneshot};
use wasmtime::component::Resource;
use wasmtime_wasi::WasiView;
use wasmtime_wasi::p2::{DynPollable, Pollable, subscribe};

#[derive(Debug)]
pub struct Subscription {
    id: usize,
    topic: String,
    receiver: mpsc::Receiver<String>,
    result: Option<String>,
    done: bool,
}

#[derive(Debug)]
pub struct ReceiveResult {
    receiver: oneshot::Receiver<String>,
    result: Option<String>,
    done: bool,
}

#[async_trait]
impl Pollable for ReceiveResult {
    async fn ready(&mut self) {
        if self.done {
            return;
        }
        let res = (&mut self.receiver).await.unwrap();
        self.result = Some(res);
        self.done = true;
    }
}

#[async_trait]
impl Pollable for Subscription {
    async fn ready(&mut self) {
        if self.done {
            return;
        }
        if let Some(result) = self.receiver.recv().await {
            self.result = Some(result);
            self.done = true;
        } else {
            self.done = true;
        }
    }
}

impl inferlet::core::message::Host for InstanceState {
    async fn send(&mut self, message: String) -> anyhow::Result<()> {
        server::Command::Send {
            inst_id: self.id(),
            message,
        }
        .dispatch()?;
        Ok(())
    }

    async fn receive(&mut self) -> anyhow::Result<Resource<ReceiveResult>> {
        let (tx, rx) = oneshot::channel();
        dispatch_u2i(PushPullCommand::Pull {
            topic: self.id().to_string(),
            message: tx,
        });
        let res = ReceiveResult {
            receiver: rx,
            result: None,
            done: false,
        };
        Ok(self.ctx().table.push(res)?)
    }

    async fn send_blob(&mut self, blob: Resource<Blob>) -> anyhow::Result<()> {
        let data = mem::take(&mut self.ctx().table.get_mut(&blob)?.data);

        server::Command::SendBlob {
            inst_id: self.id(),
            data,
        }
        .dispatch()?;
        Ok(())
    }

    async fn receive_blob(&mut self) -> anyhow::Result<Resource<BlobResult>> {
        let (tx, rx) = oneshot::channel();
        dispatch_u2i(PushPullCommand::PullBlob {
            topic: self.id().to_string(),
            message: tx,
        });
        let res = BlobResult {
            receiver: rx,
            result: None,
            done: false,
        };
        Ok(self.ctx().table.push(res)?)
    }

    async fn broadcast(&mut self, topic: String, message: String) -> anyhow::Result<()> {
        dispatch_i2i(PubSubCommand::Publish { topic, message });
        Ok(())
    }

    async fn subscribe(&mut self, topic: String) -> anyhow::Result<Resource<Subscription>> {
        let (tx, rx) = mpsc::channel(64);
        let (sub_tx, sub_rx) = oneshot::channel();
        dispatch_i2i(PubSubCommand::Subscribe {
            topic: topic.clone(),
            sender: tx,
            sub_id: sub_tx,
        });
        let sub_id = sub_rx.await?;
        let sub = Subscription {
            id: sub_id,
            topic,
            receiver: rx,
            result: None,
            done: false,
        };
        Ok(self.ctx().table.push(sub)?)
    }
}

impl inferlet::core::message::HostReceiveResult for InstanceState {
    async fn pollable(
        &mut self,
        this: Resource<ReceiveResult>,
    ) -> anyhow::Result<Resource<DynPollable>> {
        subscribe(self.ctx().table, this)
    }

    async fn get(&mut self, this: Resource<ReceiveResult>) -> anyhow::Result<Option<String>> {
        Ok(self.ctx().table.get_mut(&this)?.result.clone())
    }

    async fn drop(&mut self, this: Resource<ReceiveResult>) -> anyhow::Result<()> {
        self.ctx().table.delete(this)?;
        Ok(())
    }
}

impl inferlet::core::message::HostSubscription for InstanceState {
    async fn pollable(
        &mut self,
        this: Resource<Subscription>,
    ) -> anyhow::Result<Resource<DynPollable>> {
        subscribe(self.ctx().table, this)
    }

    async fn get(&mut self, this: Resource<Subscription>) -> anyhow::Result<Option<String>> {
        Ok(mem::take(&mut self.ctx().table.get_mut(&this)?.result))
    }

    async fn unsubscribe(&mut self, this: Resource<Subscription>) -> anyhow::Result<()> {
        let sub = self.ctx().table.get_mut(&this)?;
        sub.done = true;
        let topic = sub.topic.clone();
        let sub_id = sub.id;
        dispatch_i2i(PubSubCommand::Unsubscribe { topic, sub_id });
        Ok(())
    }

    async fn drop(&mut self, this: Resource<Subscription>) -> anyhow::Result<()> {
        self.ctx().table.delete(this)?;
        Ok(())
    }
}