cubecl_runtime/channel/
mutex.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use super::ComputeChannel;
use crate::server::{Binding, ComputeServer, CubeCount, Handle};
use crate::storage::BindingResource;
use crate::ExecutionMode;
use alloc::sync::Arc;
use alloc::vec::Vec;
use cubecl_common::benchmark::TimestampsResult;
use spin::Mutex;

/// The MutexComputeChannel ensures thread-safety by locking the server
/// on every operation
#[derive(Debug)]
pub struct MutexComputeChannel<Server> {
    server: Arc<Mutex<Server>>,
}

impl<S> Clone for MutexComputeChannel<S> {
    fn clone(&self) -> Self {
        Self {
            server: self.server.clone(),
        }
    }
}
impl<Server> MutexComputeChannel<Server>
where
    Server: ComputeServer,
{
    /// Create a new mutex compute channel.
    pub fn new(server: Server) -> Self {
        Self {
            server: Arc::new(Mutex::new(server)),
        }
    }
}

impl<Server> ComputeChannel<Server> for MutexComputeChannel<Server>
where
    Server: ComputeServer,
{
    async fn read(&self, handle: Binding) -> Vec<u8> {
        // Nb: The order here is really important - the mutex guard has to be dropped before
        // the future is polled. Just calling lock().read().await can deadlock.
        let fut = {
            let mut server = self.server.lock();
            server.read(handle)
        };
        fut.await
    }

    fn get_resource(&self, binding: Binding) -> BindingResource<Server> {
        self.server.lock().get_resource(binding)
    }

    fn create(&self, data: &[u8]) -> Handle {
        self.server.lock().create(data)
    }

    fn empty(&self, size: usize) -> Handle {
        self.server.lock().empty(size)
    }

    unsafe fn execute(
        &self,
        kernel: Server::Kernel,
        count: CubeCount,
        handles: Vec<Binding>,
        kind: ExecutionMode,
    ) {
        self.server.lock().execute(kernel, count, handles, kind)
    }

    fn flush(&self) {
        self.server.lock().flush();
    }

    async fn sync(&self) {
        // Nb: The order here is really important - the mutex guard has to be dropped before
        // the future is polled. Just calling lock().sync().await can deadlock.
        let fut = {
            let mut server = self.server.lock();
            server.sync()
        };
        fut.await
    }

    async fn sync_elapsed(&self) -> TimestampsResult {
        // Nb: The order here is really important - the mutex guard has to be dropped before
        // the future is polled. Just calling lock().sync().await can deadlock.
        let fut = {
            let mut server = self.server.lock();
            server.sync_elapsed()
        };
        fut.await
    }

    fn memory_usage(&self) -> crate::memory_management::MemoryUsage {
        self.server.lock().memory_usage()
    }

    fn enable_timestamps(&self) {
        self.server.lock().enable_timestamps();
    }

    fn disable_timestamps(&self) {
        self.server.lock().disable_timestamps();
    }
}