lark_actor/
lib.rs

1use std::collections::VecDeque;
2use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
3use std::thread;
4use url::Url;
5
6use languageserver_types::{Position, Range};
7
8pub type TaskId = usize;
9
10/// Requests to the query system.
11#[derive(Debug)]
12pub enum QueryRequest {
13    TypeAtPosition(TaskId, Url, Position),
14    RenameAtPosition(TaskId, Url, Position, String),
15    DefinitionAtPosition(TaskId, Url, Position),
16    ReferencesAtPosition(TaskId, Url, Position, bool),
17    OpenFile(Url, String),
18    EditFile(Url, Vec<(Range, String)>),
19    Initialize(TaskId),
20}
21impl QueryRequest {
22    /// True if this query will cause us to mutate the state of the
23    /// program.
24    pub fn is_mutation(&self) -> bool {
25        match self {
26            QueryRequest::OpenFile(..)
27            | QueryRequest::EditFile(..)
28            | QueryRequest::RenameAtPosition(..)
29            | QueryRequest::Initialize(..) => true,
30            QueryRequest::TypeAtPosition(..) => false,
31            QueryRequest::DefinitionAtPosition(..) => false,
32            QueryRequest::ReferencesAtPosition(..) => false,
33        }
34    }
35}
36
37/// Responses back to the LSP services from
38/// the query system.
39pub enum LspResponse {
40    Type(TaskId, String),
41    Range(TaskId, Url, Range),
42    Ranges(TaskId, Vec<(Url, Range)>),
43    WorkspaceEdits(TaskId, Vec<(Url, Range, String)>),
44    Completions(TaskId, Vec<(String, String)>),
45    Initialized(TaskId),
46    Nothing(TaskId),
47    Diagnostics(Url, Vec<(Range, String)>),
48}
49
50/// An actor in the task system. This gives a uniform way to
51/// create, control, message, and shutdown concurrent workers.
52pub trait Actor {
53    type InMessage: Send + Sync + 'static;
54
55    /// Invoked when new message(s) arrive. Contains all the messages
56    /// that can be pulled at this time. The actor is free to process
57    /// as many as they like. So long as messages remain in the
58    /// dequeue, we'll just keep calling back (possibly appending more
59    /// messages to the back). Once the queue is empty, we'll block
60    /// until we can fetch more.
61    ///
62    /// The intended workflow is as follows:
63    ///
64    /// - If desired, inspect `messages` and prune messages that become outdated
65    ///   due to later messages in the queue.
66    /// - Invoke `messages.pop_front().unwrap()` and process that message,
67    ///   then return.
68    ///   - In particular, it is probably better to return than to eagerly process
69    ///     all messages in the queue, as it gives the actor a chance to add more
70    ///     messages if they have arrived in the meantime.
71    ///     - This is only important if you are trying to remove outdated messages.
72    fn receive_messages(&mut self, messages: &mut VecDeque<Self::InMessage>);
73}
74
75pub struct ActorControl<MessageType: Send + Sync + 'static> {
76    pub channel: Sender<MessageType>,
77    pub join_handle: std::thread::JoinHandle<()>,
78}
79
80pub fn spawn_actor<T: Actor + Send + 'static>(mut actor: T) -> ActorControl<T::InMessage> {
81    let (actor_tx, actor_rx) = channel();
82    let mut message_queue = VecDeque::default();
83
84    let handle = thread::spawn(move || loop {
85        match push_all_pending(&actor_rx, &mut message_queue) {
86            Ok(()) => {
87                actor.receive_messages(&mut message_queue);
88            }
89            Err(error) => {
90                match error {
91                    PushAllPendingError::Disconnected => {
92                        eprintln!("Failure during top-level message receive");
93                    }
94                }
95
96                break;
97            }
98        }
99    });
100
101    ActorControl {
102        channel: actor_tx,
103        join_handle: handle,
104    }
105}
106
107enum PushAllPendingError {
108    Disconnected,
109}
110
111fn push_all_pending<T>(rx: &Receiver<T>, vec: &mut VecDeque<T>) -> Result<(), PushAllPendingError> {
112    // If the queue is currently empty, then block until we get at
113    // least one message.
114    if vec.is_empty() {
115        match rx.recv() {
116            Ok(m) => vec.push_back(m),
117            Err(RecvError) => return Err(PushAllPendingError::Disconnected),
118        }
119    }
120
121    // Once the queue is non-empty, opportunistically poll for more.
122    loop {
123        match rx.try_recv() {
124            Ok(m) => vec.push_back(m),
125            Err(TryRecvError::Empty) => break Ok(()),
126            Err(TryRecvError::Disconnected) => break Err(PushAllPendingError::Disconnected),
127        }
128    }
129}